You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rv...@apache.org on 2015/09/19 02:36:21 UTC

[38/51] [partial] incubator-hawq git commit: SGA import

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml b/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml
new file mode 100644
index 0000000..43b49ea
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>hawq-hadoop</artifactId>
+        <groupId>com.pivotal.hawq</groupId>
+        <version>1.1.0</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>hawq-mapreduce-parquet</artifactId>
+    <packaging>jar</packaging>
+
+    <properties>
+        <parquet.hadoop.version>1.1.0</parquet.hadoop.version>
+        <parquet.format.version>1.0.0</parquet.format.version>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>sonatype-nexus-snapshots</id>
+            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+            <releases>
+                <enabled>false</enabled>
+            </releases>
+            <snapshots>
+                <enabled>true</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>hawq-mapreduce-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <!-- Parquet artifacts -->
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>parquet-format</artifactId>
+            <version>${parquet.format.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>${parquet.hadoop.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-client</artifactId>
+                </exclusion>
+                <!-- snappy-java-1.0.5 raise UnsatisfiedLinkError on old system -->
+                <exclusion>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy-java</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>1.1.0</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.atlassian.maven.plugins</groupId>
+                <artifactId>maven-clover2-plugin</artifactId>
+                <configuration>
+                    <licenseLocation>../lib/clover.license</licenseLocation>
+                    <excludes>
+                        <!-- exclude file for outputformat -->
+                        <exclude>**/HAWQParquetOutputFormat.java</exclude>
+                        <exclude>**/HAWQSchemaConverter.java</exclude>
+                        <exclude>**/HAWQWriteSupport.java</exclude>
+                        <exclude>**/HAWQRecordWriter.java</exclude>
+                        <!-- ignore the dummy adapter -->
+                        <exclude>**/ParentValueContainerAdapter.java</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh
new file mode 100755
index 0000000..a8a6176
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+hadoop fs -rm -r $2
+
+export HADOOP_CLASSPATH=target/test-classes:target/hawq-mapreduce-parquet-1.0.0.jar:../hawq-mapreduce-common/target/hawq-mapreduce-common-1.0.0.jar:lib/parquet-column-1.3.2.jar:lib/parquet-common-1.3.2.jar:lib/parquet-encoding-1.3.2.jar:lib/parquet-hadoop-1.3.2.jar:lib/parquet-format-1.0.0.jar
+
+# enable debug
+# export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5002"
+hadoop com.pivotal.hawq.mapreduce.parquet.HAWQParquetInputDriver -conf conf/hadoop-localjob.xml $1 $2

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh
new file mode 100755
index 0000000..c32e154
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+hadoop fs -rm -r $2
+
+# TODO HAWQ only support 1.0.0 format, can we use higher version of parquet library to write file of specific version?
+
+export HADOOP_CLASSPATH=target/test-classes:target/hawq-mapreduce-parquet-1.0.0.jar:../hawq-mapreduce-common/target/hawq-mapreduce-common-1.0.0.jar:lib/parquet-column-1.0.0.jar:lib/parquet-common-1.0.0.jar:lib/parquet-encoding-1.0.0.jar:lib/parquet-hadoop-1.0.0.jar:lib/parquet-format-1.0.0.jar
+
+hadoop com.pivotal.hawq.mapreduce.parquet.HAWQParquetOutputDriver -conf conf/hadoop-localjob.xml $1 $2

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java
new file mode 100644
index 0000000..c98de65
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java
@@ -0,0 +1,74 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import com.google.common.collect.Lists;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.file.HAWQFileStatus;
+import com.pivotal.hawq.mapreduce.metadata.HAWQParquetTableMetadata;
+import com.pivotal.hawq.mapreduce.parquet.support.HAWQReadSupport;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import parquet.hadoop.ParquetInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An InputFormat that reads input data from HAWQ Parquet table.
+ */
+public class HAWQParquetInputFormat extends ParquetInputFormat<HAWQRecord> {
+
+	private static HAWQFileStatus[] hawqFileStatuses;
+
+	public HAWQParquetInputFormat() {
+		super(HAWQReadSupport.class);
+	}
+
+	public static void setInput(Configuration conf, HAWQParquetTableMetadata metadata) {
+		hawqFileStatuses = metadata.getFileStatuses();
+	}
+
+	@Override
+	public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
+		if (hawqFileStatuses == null) {
+			throw new IllegalStateException("Please call HAWQParquetInputFormat.setInput first!");
+		}
+
+		if (hawqFileStatuses.length == 0) {
+			return Lists.newArrayList(); // handle empty table
+		}
+
+		return super.getSplits(jobContext);
+	}
+
+	@Override
+	protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
+		List<FileStatus> result = Lists.newArrayList();
+		for (HAWQFileStatus hawqFileStatus : hawqFileStatuses) {
+			if (hawqFileStatus.getFileLength() == 0) continue; // skip empty file
+
+			Path path = new Path(hawqFileStatus.getFilePath());
+			FileSystem fs = path.getFileSystem(jobContext.getConfiguration());
+			FileStatus dfsStat = fs.getFileStatus(path);
+			// rewrite file length because HAWQ records the logicalEOF of file, which may
+			// be smaller than the file's actual EOF
+			FileStatus hawqStat = new FileStatus(
+					hawqFileStatus.getFileLength(), // rewrite to logicalEOF
+					dfsStat.isDirectory(),
+					dfsStat.getReplication(),
+					dfsStat.getBlockSize(),
+					dfsStat.getModificationTime(),
+					dfsStat.getAccessTime(),
+					dfsStat.getPermission(),
+					dfsStat.getOwner(),
+					dfsStat.getGroup(),
+					dfsStat.getPath());
+			result.add(hawqStat);
+		}
+
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java
new file mode 100644
index 0000000..83deb5e
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java
@@ -0,0 +1,29 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.parquet.support.HAWQWriteSupport;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.mapreduce.Job;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.util.ContextUtil;
+
+class HAWQParquetOutputFormat extends ParquetOutputFormat<HAWQRecord> {
+
+	private static HAWQSchema hawqSchema;
+
+	public HAWQParquetOutputFormat() {
+		super(new HAWQWriteSupport());
+	}
+
+	public static void setSchema(Job job, HAWQSchema schema) {
+		hawqSchema = schema;
+		HAWQWriteSupport.setSchema(ContextUtil.getConfiguration(job), hawqSchema);
+	}
+
+	public static HAWQRecord newRecord() {
+		if (hawqSchema == null) {
+			throw new IllegalStateException("you haven't set HAWQSchema yet");
+		}
+		return new HAWQRecord(hawqSchema);  // TODO reuse record?
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java
new file mode 100644
index 0000000..00964a5
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java
@@ -0,0 +1,72 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQBox;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ *   required double x1;
+ *   required double y1;
+ *   required double x2;
+ *   required double y2;
+ * }
+ * => HAWQBox
+ */
+public class HAWQBoxConverter extends GroupConverter {
+
+	private ParentValueContainer parent;
+	private Converter[] converters;
+
+	private double x1;
+	private double y1;
+	private double x2;
+	private double y2;
+
+	public HAWQBoxConverter(ParentValueContainer parent) {
+		this.parent = parent;
+		this.converters = new Converter[4];
+		this.converters[0] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				x1 = value;
+			}
+		};
+		this.converters[1] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				y1 = value;
+			}
+		};
+		this.converters[2] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				x2 = value;
+			}
+		};
+		this.converters[3] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				y2 = value;
+			}
+		};
+	}
+
+	@Override
+	public Converter getConverter(int fieldIndex) {
+		return this.converters[fieldIndex];
+	}
+
+	@Override
+	public void start() {}
+
+	@Override
+	public void end() {
+		try {
+			HAWQBox box = new HAWQBox(x1, y1, x2, y2);
+			parent.setBox(box);
+		} catch (HAWQException e) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java
new file mode 100644
index 0000000..f7883b6
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java
@@ -0,0 +1,64 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQCircle;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ *   required double x;
+ *   required double y;
+ *   required double r;
+ * }
+ * => HAWQCircle
+ */
+public class HAWQCircleConverter extends GroupConverter {
+
+	private ParentValueContainer parent;
+	private Converter[] converters;
+
+	private double x;
+	private double y;
+	private double r;
+
+	public HAWQCircleConverter(ParentValueContainer parent) {
+		this.parent = parent;
+		this.converters = new Converter[3];
+		this.converters[0] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				x = value;
+			}
+		};
+		this.converters[1] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				y = value;
+			}
+		};
+		this.converters[2] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				r = value;
+			}
+		};
+	}
+
+	@Override
+	public Converter getConverter(int fieldIndex) {
+		return this.converters[fieldIndex];
+	}
+
+	@Override
+	public void start() {}
+
+	@Override
+	public void end() {
+		try {
+			HAWQCircle circle = new HAWQCircle(x, y, r);
+			parent.setCircle(circle);
+		} catch (HAWQException e) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java
new file mode 100644
index 0000000..7ad2c34
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java
@@ -0,0 +1,72 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQLseg;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ *   required double x1;
+ *   required double y1;
+ *   required double x2;
+ *   required double y2;
+ * }
+ * => HAWQLseg
+ */
+public class HAWQLineSegmentConverter extends GroupConverter {
+
+	private ParentValueContainer parent;
+	private Converter[] converters;
+
+	private double x1;
+	private double y1;
+	private double x2;
+	private double y2;
+
+	public HAWQLineSegmentConverter(ParentValueContainer parent) {
+		this.parent = parent;
+		this.converters = new Converter[4];
+		this.converters[0] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				x1 = value;
+			}
+		};
+		this.converters[1] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				y1 = value;
+			}
+		};
+		this.converters[2] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				x2 = value;
+			}
+		};
+		this.converters[3] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				y2 = value;
+			}
+		};
+	}
+
+	@Override
+	public Converter getConverter(int fieldIndex) {
+		return this.converters[fieldIndex];
+	}
+
+	@Override
+	public void start() {}
+
+	@Override
+	public void end() {
+		try {
+			HAWQLseg lseg = new HAWQLseg(x1, y1, x2, y2);
+			parent.setLseg(lseg);
+		} catch (HAWQException e) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java
new file mode 100644
index 0000000..bbcdb5f
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java
@@ -0,0 +1,66 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPath;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPoint;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * group {
+ *   required boolean is_open;
+ *   repeated group {
+ *     required double x;
+ *     required double y;
+ *   }
+ * }
+ * => HAWQPath
+ */
+public class HAWQPathConverter extends GroupConverter {
+
+	private ParentValueContainer parent;
+	private Converter[] converters;
+
+	private boolean isOpen;
+	private List<HAWQPoint> points;
+
+	public HAWQPathConverter(ParentValueContainer parent) {
+		this.parent = parent;
+		this.points = new ArrayList<HAWQPoint>();
+		this.converters = new Converter[2];
+
+		this.converters[0] = new HAWQRecordConverter.HAWQPrimitiveConverter(new ParentValueContainerAdapter() {
+			@Override
+			public void setBoolean(boolean x) throws HAWQException {
+				HAWQPathConverter.this.isOpen = x;
+			}
+		});
+		this.converters[1] = new HAWQPointConverter(new ParentValueContainerAdapter() {
+			@Override
+			public void setPoint(HAWQPoint x) throws HAWQException {
+				HAWQPathConverter.this.points.add(x);
+			}
+		});
+	}
+
+	@Override
+	public Converter getConverter(int fieldIndex) {
+		return this.converters[fieldIndex];
+	}
+
+	@Override
+	public void start() {
+		points.clear();
+	}
+
+	@Override
+	public void end() {
+		try {
+			HAWQPath path = new HAWQPath(isOpen, points);
+			parent.setPath(path);
+		} catch (HAWQException e) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java
new file mode 100644
index 0000000..20e2884
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java
@@ -0,0 +1,56 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPoint;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ *   required double x;
+ *   required double y;
+ * }
+ * => HAWQPoint
+ */
+public class HAWQPointConverter extends GroupConverter {
+
+	private ParentValueContainer parent;
+	private Converter[] converters;
+
+	private double x;
+	private double y;
+
+    public HAWQPointConverter(ParentValueContainer parent) {
+		this.parent = parent;
+		this.converters = new Converter[2];
+		this.converters[0] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				x = value;
+			}
+		};
+		this.converters[1] = new PrimitiveConverter() {
+			@Override
+			public void addDouble(double value) {
+				y = value;
+			}
+		};
+	}
+
+	@Override
+	public Converter getConverter(int fieldIndex) {
+		return this.converters[fieldIndex];
+	}
+
+	@Override
+	public void start() {}
+
+	@Override
+	public void end() {
+		try {
+			HAWQPoint point = new HAWQPoint(x, y);
+			parent.setPoint(point);
+		} catch (HAWQException e) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java
new file mode 100644
index 0000000..dbec017
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java
@@ -0,0 +1,74 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQBox;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPoint;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPolygon;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * group {
+ *   required group boundbox {
+ *     required double x1;
+ *     required double y1;
+ *     required double x2;
+ *     required double y2;
+ *   },
+ *   repeated group points {
+ *     required double x;
+ *     required double y;
+ *   }
+ * }
+ * => HAWQPolygon
+ */
+public class HAWQPolygonConverter extends GroupConverter {
+
+	private ParentValueContainer parent;
+	private Converter[] converters;
+
+	private HAWQBox boundbox;
+	private List<HAWQPoint> points;
+
+	public HAWQPolygonConverter(ParentValueContainer parent) {
+		this.parent = parent;
+		this.converters = new Converter[2];
+		this.points = new ArrayList<HAWQPoint>();
+
+		this.converters[0] = new HAWQBoxConverter(new ParentValueContainerAdapter() {
+			@Override
+			public void setBox(HAWQBox x) throws HAWQException {
+				HAWQPolygonConverter.this.boundbox = x;
+			}
+		});
+
+		this.converters[1] = new HAWQPointConverter(new ParentValueContainerAdapter() {
+			@Override
+			public void setPoint(HAWQPoint x) throws HAWQException {
+				HAWQPolygonConverter.this.points.add(x);
+			}
+		});
+	}
+
+	@Override
+	public Converter getConverter(int fieldIndex) {
+		return this.converters[fieldIndex];
+	}
+
+	@Override
+	public void start() {
+		this.boundbox = null;
+		this.points.clear();
+	}
+
+	@Override
+	public void end() {
+		try {
+			HAWQPolygon polygon = new HAWQPolygon(points, boundbox);
+			parent.setPolygon(polygon);
+		}  catch (HAWQException e) {}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java
new file mode 100644
index 0000000..96df594
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java
@@ -0,0 +1,552 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.datatype.*;
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import com.pivotal.hawq.mapreduce.util.HAWQConvertUtil;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.MessageType;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * HAWQ's implementation of Parquet's GroupConverter.
+ */
+public class HAWQRecordConverter extends GroupConverter {
+
+	private final ParentValueContainer parent;
+	private HAWQSchema hawqSchema;
+	private final Converter[] converters;
+	private HAWQRecord currentRecord;
+
+	// TODO maybe HAWQRecordConverter(HAWQSchema requestedSchema, HAWQSchema hawqSchema) ?
+	public HAWQRecordConverter(MessageType requestedSchema, HAWQSchema hawqSchema) {
+		this(null, requestedSchema, hawqSchema);
+	}
+
+	public HAWQRecordConverter(ParentValueContainer parent, MessageType requestedSchema, HAWQSchema hawqSchema) {
+		this.parent = parent;
+		this.hawqSchema = hawqSchema;
+
+		int fieldsNum = hawqSchema.getFieldCount();
+		this.converters = new Converter[fieldsNum];
+
+		int fieldIndex = 0;  // index of converter starts from 0
+		for (HAWQField field : hawqSchema.getFields()) {
+			final int recordFieldIndex = fieldIndex + 1;  // index in HAWQRecord starts from 1
+			Converter fieldConverter = newConverter(field, new ParentValueContainer() {
+				@Override
+				public void setBoolean(boolean x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setBoolean(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setBit(HAWQVarbit x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setBit(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setByte(byte x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setByte(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setBytes(byte[] x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setBytes(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setShort(short x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setShort(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setInt(int x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setInt(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setLong(long x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setLong(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setFloat(float x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setFloat(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setDouble(double x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setDouble(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setBigDecimal(BigDecimal x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setBigDecimal(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setString(String x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setString(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setDate(Date x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setDate(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setTime(Time x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setTime(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setTimestamp(Timestamp x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setTimestamp(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setInterval(HAWQInterval x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setInterval(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setPoint(HAWQPoint x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setPoint(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setLseg(HAWQLseg x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setLseg(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setBox(HAWQBox x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setBox(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setCircle(HAWQCircle x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setCircle(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setPath(HAWQPath x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setPath(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setPolygon(HAWQPolygon x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setPolygon(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setMacaddr(HAWQMacaddr x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setMacaddr(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setInet(HAWQInet x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setInet(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setCidr(HAWQCidr x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setCidr(recordFieldIndex, x);
+				}
+
+				@Override
+				public void setArray(Array x) throws HAWQException {
+					throw new UnsupportedOperationException();  // TODO
+				}
+
+				@Override
+				public void setField(HAWQRecord x) throws HAWQException {
+					HAWQRecordConverter.this.currentRecord.setField(recordFieldIndex, x);
+				}
+			});
+			this.converters[fieldIndex++] = fieldConverter;
+		}
+	}
+
+	private Converter newConverter(HAWQField hawqType, ParentValueContainer parent) {
+		if (!hawqType.isPrimitive())  // FIXME
+			throw new RuntimeException("HAWQRecordConverter.newConverter not implement group type converter");
+
+		switch (hawqType.asPrimitive().getType()) {
+			case BIT:case VARBIT:
+				return new HAWQBitsConverter(parent);
+			case BYTEA:
+				return new HAWQByteArrayConverter(parent);
+			/* number related type */
+			case BOOL:case INT4:case INT8:case FLOAT4:case FLOAT8:
+				return new HAWQPrimitiveConverter(parent);
+			case INT2:
+				return new HAWQShortConverter(parent);
+			case NUMERIC:
+				return new HAWQBigDecimalConverter(parent);
+			/* string related type */
+			case BPCHAR:case VARCHAR:case TEXT:case XML:
+				return new HAWQStringConverter(parent);
+			/* time related type */
+			case DATE:
+				return new HAWQDateConverter(parent);
+			case TIME:
+				return new HAWQTimeConverter(parent);
+			case TIMETZ:
+				return new HAWQTimeTZConverter(parent);
+			case TIMESTAMP:
+				return new HAWQTimestampConverter(parent);
+			case TIMESTAMPTZ:
+				return new HAWQTimestampTZConverter(parent);
+			case INTERVAL:
+				return new HAWQIntervalConverter(parent);
+			/* geometry related type */
+			case POINT:
+				return new HAWQPointConverter(parent);
+			case LSEG:
+				return new HAWQLineSegmentConverter(parent);
+			case PATH:
+				return new HAWQPathConverter(parent);
+			case BOX:
+				return new HAWQBoxConverter(parent);
+			case POLYGON:
+				return new HAWQPolygonConverter(parent);
+			case CIRCLE:
+				return new HAWQCircleConverter(parent);
+			/* other type */
+			case MACADDR:
+				return new HAWQMacaddrConverter(parent);
+			case INET:
+				return new HAWQInetConverter(parent);
+			case CIDR:
+				return new HAWQCidrConverter(parent);
+			default:
+				throw new UnsupportedOperationException("unsupported type " + hawqType);
+		}
+	}
+
+	@Override
+	public Converter getConverter(int fieldIndex) {
+		return converters[fieldIndex];
+	}
+
+	@Override
+	public void start() {
+		currentRecord = new HAWQRecord(hawqSchema);
+	}
+
+	@Override
+	public void end() {
+		if (parent != null) {
+			try {
+				parent.setField(currentRecord);
+			} catch (HAWQException e) {}
+		}
+	}
+
+	public HAWQRecord getCurrentRecord() {
+		return currentRecord;
+	}
+
+	//////////////////////////////////////////////////////////////
+	/// converters from parquet data type to HAWQ data type
+	//////////////////////////////////////////////////////////////
+
+	static class HAWQPrimitiveConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQPrimitiveConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBoolean(boolean value) {
+			try {
+				parent.setBoolean(value);
+			} catch (HAWQException e) {}
+		}
+
+		@Override
+		public void addInt(int value) {
+			try {
+				parent.setInt(value);
+			} catch (HAWQException e) {}
+		}
+
+		@Override
+		public void addLong(long value) {
+			try {
+				parent.setLong(value);
+			} catch (HAWQException e) {}
+		}
+
+		@Override
+		public void addFloat(float value) {
+			try {
+				parent.setFloat(value);
+			} catch (HAWQException e) {}
+		}
+
+		@Override
+		public void addDouble(double value) {
+			try {
+				parent.setDouble(value);
+			} catch (HAWQException e) {}
+		}
+	}
+
+	static class HAWQShortConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQShortConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addInt(int value) {
+			try {
+				parent.setShort((short) value);
+			} catch (HAWQException e) {}
+		}
+	}
+
+	static class HAWQBigDecimalConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQBigDecimalConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				// FIXME bytesToDecimal return "NAN" case
+				parent.setBigDecimal((BigDecimal) HAWQConvertUtil.bytesToDecimal(value.getBytes(), 0));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to BigDecimal", e);
+			}
+		}
+	}
+
+	static class HAWQStringConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQStringConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setString(value.toStringUsingUTF8());
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to String", e);
+			}
+		}
+	}
+
+	static class HAWQBitsConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQBitsConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setBit(HAWQConvertUtil.bytesToVarbit(value.getBytes(), 0));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to Varbit");
+			}
+		}
+	}
+
+	static class HAWQByteArrayConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQByteArrayConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setBytes(value.getBytes());
+			} catch (HAWQException e) {}
+		}
+	}
+
+	// date is stored as a 4-bytes int
+	static class HAWQDateConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQDateConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addInt(int value) {
+			try {
+				parent.setDate(HAWQConvertUtil.toDate(value));
+
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Integer to Date", e);
+			}
+		}
+	}
+
+	// time (without timezone) is stored in 8-bytes long
+	static class HAWQTimeConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQTimeConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addLong(long value) {
+			try {
+				parent.setTime(HAWQConvertUtil.toTime(value));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Long to Time", e);
+			}
+		}
+	}
+
+	// time (with timezone) is stored in 12 bytes binary
+	static class HAWQTimeTZConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQTimeTZConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setTime(HAWQConvertUtil.toTimeTz(value.getBytes(), 0));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to Time (with timezone)", e);
+			}
+		}
+	}
+
+	// timestamp (without timezone) is stored in 8-bytes long
+	static class HAWQTimestampConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQTimestampConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addLong(long value) {
+			try {
+				parent.setTimestamp(HAWQConvertUtil.toTimestamp(value, false));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Long to Timestamp", e);
+			}
+		}
+	}
+
+	// timestamp (with timezone) is stored in 8-bytes long
+	static class HAWQTimestampTZConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQTimestampTZConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addLong(long value) {
+			try {
+				parent.setTimestamp(HAWQConvertUtil.toTimestamp(value, true));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Long to Timestamp (with timezone)", e);
+			}
+		}
+	}
+
+	// interval is stored in 16 bytes binary
+	static class HAWQIntervalConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQIntervalConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setInterval(HAWQConvertUtil.bytesToInterval(value.getBytes(), 0));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to Interval", e);
+			}
+		}
+	}
+
+	// macaddr is stored as binary
+	static class HAWQMacaddrConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		public HAWQMacaddrConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setMacaddr(new HAWQMacaddr(value.getBytes()));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to HAWQMacaddr");
+			}
+		}
+	}
+
+	// inet is stored as binary
+	static class HAWQInetConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		HAWQInetConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setInet(HAWQConvertUtil.bytesToInet(value.getBytes(), 0));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to HAWQInet");
+			}
+		}
+	}
+
+	// cidr is stored as binary
+	static class HAWQCidrConverter extends PrimitiveConverter {
+		private ParentValueContainer parent;
+
+		HAWQCidrConverter(ParentValueContainer parent) {
+			this.parent = parent;
+		}
+
+		@Override
+		public void addBinary(Binary value) {
+			try {
+				parent.setCidr(HAWQConvertUtil.bytesToCidr(value.getBytes(), 0));
+			} catch (HAWQException e) {
+				throw new RuntimeException("error during conversion from Binary to HAWQCidr");
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java
new file mode 100644
index 0000000..fd7000f
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java
@@ -0,0 +1,33 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+/**
+ * HAWQ's implementation of the RecordMaterializer interface.
+ *
+ * <p>This class materialize HAWQRecord objects from a stream of Parquet data,
+ * using a HAWQRecordConverter internally to convert data types between HAWQ and Parquet.
+ *
+ */
+public class HAWQRecordMaterializer extends RecordMaterializer<HAWQRecord> {
+
+	private HAWQRecordConverter rootConverter;
+
+	public HAWQRecordMaterializer(MessageType requestedSchema, HAWQSchema hawqSchema) {
+		rootConverter = new HAWQRecordConverter(requestedSchema, hawqSchema);
+	}
+
+	@Override
+	public HAWQRecord getCurrentRecord() {
+		return rootConverter.getCurrentRecord();
+	}
+
+	@Override
+	public GroupConverter getRootConverter() {
+		return rootConverter;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java
new file mode 100644
index 0000000..cb38157
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java
@@ -0,0 +1,93 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQGroupField;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+
+/**
+ * Convert HAWQRecord to Parquet Structure, writes to RecordConsumer
+ * User: gaod1
+ * Date: 9/10/13
+ */
+public class HAWQRecordWriter {
+
+	private RecordConsumer consumer;
+	private HAWQSchema schema;
+
+	public HAWQRecordWriter(RecordConsumer consumer, HAWQSchema schema) {
+		this.consumer = consumer;
+		this.schema = schema;
+	}
+
+	public void writeRecord(HAWQRecord record) throws HAWQException {
+		consumer.startMessage();
+		for (int i = 1; i <= schema.getFieldCount(); i++) {
+			writeField(schema.getField(i), i, record.getObject(i));
+		}
+		consumer.endMessage();
+	}
+
+	private void writeField(HAWQField fieldSchema, int fieldIndex, Object value) throws HAWQException {
+		if (value == null) {
+			if (fieldSchema.isOptional()) return;
+			else throw new HAWQException("missing value for required field " + fieldSchema.getName());
+		}
+
+		consumer.startField(fieldSchema.getName(), fieldIndex - 1);
+		if (fieldSchema.isPrimitive()) {
+			writePrimitive(fieldSchema.asPrimitive(), value);
+		} else {
+			writeGroup(fieldSchema.asGroup(), (HAWQRecord) value);
+		}
+		consumer.endField(fieldSchema.getName(), fieldIndex - 1);
+	}
+
+	private void writeGroup(HAWQGroupField groupFieldSchema, HAWQRecord value) throws HAWQException {
+		consumer.startGroup();
+		for (int i = 1; i <= groupFieldSchema.getFieldCount(); i++) {
+			writeField(groupFieldSchema.getField(i), i, value.getObject(i));
+		}
+		consumer.endGroup();
+	}
+
+	private void writePrimitive(HAWQPrimitiveField primitiveFieldSchema, Object value) {
+		// TODO
+		switch (primitiveFieldSchema.getType()) {
+			case BOOL:
+				consumer.addBoolean((Boolean) value);
+				break;
+			case BYTEA:
+				consumer.addBinary(Binary.fromByteArray((byte[]) value));
+				break;
+			case INT2:
+				consumer.addInteger(((Short) value).intValue());
+				break;
+			case INT4:
+				consumer.addInteger((Integer) value);
+				break;
+			case INT8:
+				consumer.addLong((Long) value);
+				break;
+			case FLOAT4:
+				consumer.addFloat((Float) value);
+				break;
+			case FLOAT8:
+				consumer.addDouble((Double) value);
+				break;
+			case VARCHAR:case TEXT:
+				consumer.addBinary(Binary.fromString((String) value));
+				break;
+			case DATE:
+				break;
+			case TIME:
+				break;
+			default:
+				throw new RuntimeException("unsupported type in HAWQRecordWriter");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java
new file mode 100644
index 0000000..b5363e6
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java
@@ -0,0 +1,83 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.datatype.*;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public interface ParentValueContainer {
+
+	///////////////////////////////////////////////////////////////////////
+	// BOOL, BIT, VARBIT, BYTEA, INT2, INT4, INT8, FLOAT4, FLOAT8, NUMERIC
+	///////////////////////////////////////////////////////////////////////
+
+	void setBoolean(boolean x) throws HAWQException;
+
+	void setBit(HAWQVarbit x) throws HAWQException;
+
+	void setByte(byte x) throws HAWQException;
+
+	void setBytes(byte[] x) throws HAWQException;
+
+	void setShort(short x) throws HAWQException;
+
+	void setInt(int x) throws HAWQException;
+
+	void setLong(long x) throws HAWQException;
+
+	void setFloat(float x) throws HAWQException;
+
+	void setDouble(double x) throws HAWQException;
+
+	void setBigDecimal(BigDecimal x) throws HAWQException;
+
+	///////////////////////////////////////////////////////////////////////
+	// CHAR, BPCHAR, VARCHAR, TEXT, DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, INTERVAL
+	///////////////////////////////////////////////////////////////////////
+
+	void setString(String x) throws HAWQException;
+
+	void setDate(Date x) throws HAWQException;
+
+	void setTime(Time x) throws HAWQException;
+
+	void setTimestamp(Timestamp x) throws HAWQException;
+
+	void setInterval(HAWQInterval x) throws HAWQException;
+
+	///////////////////////////////////////////////////////////////////////
+	// POINT, LSEG, BOX, CIRCLE, PATH, POLYGON, MACADDR, INET, CIDR, XML
+	///////////////////////////////////////////////////////////////////////
+
+	void setPoint(HAWQPoint x) throws HAWQException;
+
+	void setLseg(HAWQLseg x) throws HAWQException;
+
+	void setBox(HAWQBox x) throws HAWQException;
+
+	void setCircle(HAWQCircle x) throws HAWQException;
+
+	void setPath(HAWQPath x) throws HAWQException;
+
+	void setPolygon(HAWQPolygon x) throws HAWQException;
+
+	void setMacaddr(HAWQMacaddr x) throws HAWQException;
+
+	void setInet(HAWQInet x) throws HAWQException;
+
+	void setCidr(HAWQCidr x) throws HAWQException;
+
+	///////////////////////////////////////////////////////////////////////
+	// other
+	///////////////////////////////////////////////////////////////////////
+
+	void setArray(Array x) throws HAWQException;
+
+	void setField(HAWQRecord x) throws HAWQException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java
new file mode 100644
index 0000000..3ae4ac1
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java
@@ -0,0 +1,143 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.datatype.*;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public abstract class ParentValueContainerAdapter implements ParentValueContainer {
+	@Override
+	public void setBoolean(boolean x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setBit(HAWQVarbit x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setByte(byte x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setBytes(byte[] x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setShort(short x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setInt(int x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setLong(long x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setFloat(float x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setDouble(double x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setBigDecimal(BigDecimal x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setString(String x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setDate(Date x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setTime(Time x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setTimestamp(Timestamp x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setInterval(HAWQInterval x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setPoint(HAWQPoint x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setLseg(HAWQLseg x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setBox(HAWQBox x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setCircle(HAWQCircle x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setPath(HAWQPath x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setPolygon(HAWQPolygon x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setMacaddr(HAWQMacaddr x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setInet(HAWQInet x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setCidr(HAWQCidr x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setArray(Array x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void setField(HAWQRecord x) throws HAWQException {
+		throw new UnsupportedOperationException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java
new file mode 100644
index 0000000..bdeb63e
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java
@@ -0,0 +1,50 @@
+package com.pivotal.hawq.mapreduce.parquet.support;
+
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.parquet.convert.HAWQRecordMaterializer;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.conf.Configuration;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import java.util.Map;
+
+/**
+ * User: gaod1
+ * Date: 8/8/13
+ */
+public class HAWQReadSupport extends ReadSupport<HAWQRecord> {
+
+	private static final String KEY_HAWQ_SCHEMA = "hawq.schema";
+	private static final String HAWQ_REQUESTED_SCHEMA = "hawq.schema.requested";
+
+	@Override
+	public ReadContext init(Configuration configuration,
+							Map<String, String> keyValueMetaData,
+							MessageType fileSchema) {
+
+//		String requestedProjectionString = configuration.get(HAWQ_REQUESTED_SCHEMA);
+//
+//		if (requestedProjectionString == null) { // read all data
+//			return new ReadContext(fileSchema);
+//		}
+//
+//		HAWQSchema requestedHAWQSchema = HAWQSchema.fromString(requestedProjectionString);
+//		MessageType requestedSchema = HAWQSchemaConverter.convertToParquet(requestedHAWQSchema);
+//		return new ReadContext(requestedSchema);
+
+		return new ReadContext(fileSchema);
+	}
+
+	@Override
+	public RecordMaterializer<HAWQRecord> prepareForRead(Configuration configuration,
+														 Map<String, String> keyValueMetaData,
+														 MessageType fileSchema, ReadContext readContext) {
+
+		HAWQSchema hawqSchema = HAWQSchema.fromString(keyValueMetaData.get(KEY_HAWQ_SCHEMA));
+		return new HAWQRecordMaterializer(
+				readContext.getRequestedSchema(), // requested parquet schema
+				hawqSchema); // corresponding requested HAWQSchema
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java
new file mode 100644
index 0000000..6248075
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java
@@ -0,0 +1,57 @@
+package com.pivotal.hawq.mapreduce.parquet.support;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.parquet.convert.HAWQRecordWriter;
+import com.pivotal.hawq.mapreduce.parquet.util.HAWQSchemaConverter;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.conf.Configuration;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * User: gaod1
+ * Date: 9/10/13
+ */
+public class HAWQWriteSupport extends WriteSupport<HAWQRecord> {
+
+	// key of HAWQ Schema in extraKeyValue of HAWQ's parquet file
+	private static final String HAWQ_SCHEMA_KEY = "hawq.schema";
+
+	private HAWQSchema hawqSchema;
+	private MessageType parquetSchema;
+	private HAWQRecordWriter recordWriter;
+
+	public static void setSchema(Configuration configuration, HAWQSchema hawqSchema) {
+		configuration.set("parquet.hawq.schema", hawqSchema.toString());
+	}
+
+	@Override
+	public WriteContext init(Configuration configuration) {
+		hawqSchema = HAWQSchema.fromString(configuration.get("parquet.hawq.schema"));
+		parquetSchema = HAWQSchemaConverter.convertToParquet(hawqSchema);
+
+		Map<String, String> extraMetaData = new HashMap<String, String>();
+		extraMetaData.put(HAWQ_SCHEMA_KEY, hawqSchema.toString());
+
+		return new WriteContext(parquetSchema, extraMetaData);
+	}
+
+	@Override
+	public void prepareForWrite(RecordConsumer recordConsumer) {
+		recordWriter = new HAWQRecordWriter(recordConsumer, hawqSchema);
+	}
+
+	@Override
+	public void write(HAWQRecord record) {
+		try {
+			recordWriter.writeRecord(record);
+		} catch (HAWQException e) {
+			throw new RuntimeException("failed to write record", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java
new file mode 100644
index 0000000..73a96b9
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java
@@ -0,0 +1,76 @@
+package com.pivotal.hawq.mapreduce.parquet.util;
+
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * User: gaod1
+ * Date: 9/10/13
+ */
+public final class HAWQSchemaConverter {
+
+	/**
+	 * Convert from HAWQ schema to Parquet schema.
+	 *
+	 * @param hawqSchema schema to be converted.
+	 * @return corresponding Parquet schema.
+	 */
+	public static MessageType convertToParquet(HAWQSchema hawqSchema) {
+		List<Type> parquetFields = new ArrayList<Type>();
+		for (HAWQField hawqField : hawqSchema.getFields()) {
+			parquetFields.add(convertField(hawqField.asPrimitive()));
+		}
+		return new MessageType(hawqSchema.getName(), parquetFields);
+	}
+
+	private static Type convertField(HAWQPrimitiveField hawqField) {
+		// FIXME do not consider UDT
+		String name = hawqField.getName();
+		Repetition repetition = getRepetition(hawqField);
+		switch (hawqField.getType()) {
+			case BOOL:
+				return new PrimitiveType(repetition, PrimitiveTypeName.BOOLEAN, name);
+			case BYTEA:
+				return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+			case INT2:case INT4:
+				return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+			case INT8:
+				return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+			case FLOAT4:
+				return new PrimitiveType(repetition, PrimitiveTypeName.FLOAT, name);
+			case FLOAT8:
+				return new PrimitiveType(repetition, PrimitiveTypeName.DOUBLE, name);
+			case VARCHAR:
+				return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name, OriginalType.UTF8);
+			/* time-related type */
+			case DATE:
+				return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+			case TIME:
+				return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+			case TIMETZ:
+				return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+			case TIMESTAMP:
+				return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+			case TIMESTAMPTZ:
+				return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+			case INTERVAL:
+				return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+			default:
+				throw new RuntimeException("unsupported hawq type: " + hawqField.getType().name());
+		}
+	}
+
+	private static Repetition getRepetition(HAWQField field) {
+		return field.isOptional() ? Repetition.OPTIONAL : Repetition.REQUIRED;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java
new file mode 100644
index 0000000..245aba7
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java
@@ -0,0 +1,189 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+
+/**
+ * Job for writing HAWQ parquet file
+ * User: gaod1
+ * Date: 9/16/13
+ */
+public class HAWQParquetOutputDriver extends Configured implements Tool {
+
+	public static void main(String[] args) throws Exception {
+		int exitCode = ToolRunner.run(new HAWQParquetOutputDriver(), args);
+		System.exit(exitCode);
+	}
+
+	@Override
+	public int run(String[] args) throws Exception {
+		Job job = new Job(getConf(), "HAWQParquetOutputFormat");
+		job.setJarByClass(HAWQParquetOutputDriver.class);
+
+		job.setOutputFormatClass(HAWQParquetOutputFormat.class);
+
+		/*
+		// int2 int4 int8
+		HAWQSchema schema = new HAWQSchema("t_int",
+				HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.INT2, "col_short"),
+				HAWQSchema.optional_field(HAWQPrimitiveField.PrimitiveType.INT4, "col_int"),
+				HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.INT8, "col_long")
+		);
+		job.setMapperClass(WriteIntMapper.class);
+		*/
+
+		/*
+		// varchar
+		HAWQSchema schema = new HAWQSchema("t_varchar",
+				HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.VARCHAR, "col_varchar")
+		);
+		job.setMapperClass(WriteVarcharMapper.class);
+		*/
+
+		/*
+		// float4 float8
+		HAWQSchema schema = new HAWQSchema("t_floating",
+				HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.FLOAT4, "col_float"),
+				HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.FLOAT8, "col_long")
+		);
+		job.setMapperClass(WriteFloatingNumberMapper.class);
+		*/
+
+		// boolean
+//		HAWQSchema schema = new HAWQSchema("t_boolean",
+//				HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.BOOL, "col_bool"));
+//		job.setMapperClass(WriteBooleanMapper.class);
+
+		// byte array
+		HAWQSchema schema = new HAWQSchema("t_bytea",
+				HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.BYTEA, "col_bytea"));
+		job.setMapperClass(WriteByteArrayMapper.class);
+
+		HAWQParquetOutputFormat.setSchema(job, schema);
+
+
+		FileInputFormat.addInputPath(job, new Path(args[0]));
+		HAWQParquetOutputFormat.setOutputPath(job, new Path(args[1]));
+
+		job.setNumReduceTasks(0);
+
+		job.setMapOutputKeyClass(Void.class);
+		job.setMapOutputValueClass(HAWQRecord.class);
+
+		return job.waitForCompletion(true) ? 0 : 1;
+	}
+
+	private static class WriteIntMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+
+		private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+		@Override
+		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+			Integer recordNum = Integer.parseInt(value.toString());
+			try {
+				for (int i = 0; i < recordNum; i++) {
+					record.reset();
+					record.setShort(1, (short) (i + 1));
+					if (i % 2 == 0) {
+						record.setInt(2, i);
+					}
+					record.setLong(3, i * 100);
+					context.write(null, record);
+				}
+
+			} catch (HAWQException e) {
+				throw new IOException(e);
+			}
+		}
+	}
+
+	private static class WriteVarcharMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+		private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+		@Override
+		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+			Integer recordNum = Integer.parseInt(value.toString());
+			try {
+				for (int i = 0; i < recordNum; i++) {
+					record.reset();
+					record.setString(1, "hello" + i);
+					context.write(null, record);
+				}
+
+			} catch (HAWQException e) {
+				throw new IOException(e);
+			}
+		}
+	}
+
+	private static class WriteFloatingNumberMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+		private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+		@Override
+		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+			Integer recordNum = Integer.parseInt(value.toString());
+			try {
+				for (int i = 0; i < recordNum; i++) {
+					record.reset();
+					record.setFloat(1, 1.0f * i);
+					record.setDouble(2, 2 * Math.PI * i);
+					context.write(null, record);
+				}
+
+			} catch (HAWQException e) {
+				throw new IOException(e);
+			}
+		}
+	}
+
+	private static class WriteBooleanMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+		private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+		@Override
+		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+			Integer recordNum = Integer.parseInt(value.toString());
+			try {
+				for (int i = 0; i < recordNum; i++) {
+					record.reset();
+					record.setBoolean(1, i % 2 == 0);
+					context.write(null, record);
+				}
+
+			} catch (HAWQException e) {
+				throw new IOException(e);
+			}
+		}
+	}
+
+	private static class WriteByteArrayMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+		private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+		@Override
+		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+			Integer recordNum = Integer.parseInt(value.toString());
+			try {
+				for (int i = 0; i < recordNum; i++) {
+					record.reset();
+					record.setBytes(1, String.format("hello %d", i).getBytes());
+					context.write(null, record);
+				}
+
+			} catch (HAWQException e) {
+				throw new IOException(e);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java
new file mode 100644
index 0000000..1947534
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java
@@ -0,0 +1,142 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import parquet.column.page.Page;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+
+public class TestHAWQParquetOutput {
+	private static final PageReadStore NULL = null;
+	private ParquetMetadata readFooter;
+	private ParquetFileReader reader;
+	private Configuration configuration;
+	private Path path;
+	private MessageType schema;
+
+	public TestHAWQParquetOutput(String filepath) throws Exception {
+		File parquetFile = new File(filepath).getAbsoluteFile();
+		path = new Path(parquetFile.toURI());
+		configuration = new Configuration();
+		readFooter = ParquetFileReader.readFooter(configuration, path);
+		schema = readFooter.getFileMetaData().getSchema();
+	}
+
+	public int getRowGroupCount() {
+		int count = readFooter.getBlocks().size();
+		System.out.println("-------------RowGroup Count: " + count);
+		return count;
+	}
+
+	public long getRowCount(int rowGroupIndex) {
+		long count = readFooter.getBlocks().get(rowGroupIndex).getRowCount();
+		System.out.println("-------------Row Count in RowGroup" + rowGroupIndex
+				+ ": " + count);
+		return count;
+	}
+
+	public int getColumnCountInRowGroup(int rowGroupIndex) {
+		int count = readFooter.getBlocks().get(rowGroupIndex).getColumns()
+				.size();
+		System.out.println("-------------Column Count in RowGroup"
+				+ rowGroupIndex + ": " + count);
+		return count;
+	}
+
+	public void getColumnType(int rowGroupIndex, int columnIndex) {
+		String type = readFooter.getBlocks().get(rowGroupIndex).getColumns()
+				.get(columnIndex).getType().toString();
+		System.out.println("-------------Type of Column" + columnIndex
+				+ " in RowGroup" + rowGroupIndex + ": " + type);
+	}
+
+	public void getColumnCodec(int rowGroupIndex, int columnIndex) {
+		String Codec = readFooter.getBlocks().get(rowGroupIndex).getColumns()
+				.get(columnIndex).getCodec().toString();
+		System.out.println("-------------Codec of Column" + columnIndex
+				+ " in RowGroup" + rowGroupIndex + ": " + Codec);
+	}
+
+	public void getMetadata() {
+		String metadata = readFooter.toString();
+		System.out.println(metadata);
+	}
+
+	public void getColumnDataInRowGroup(int rowGroupIndex, int columnIndex)
+			throws Exception {
+		PageReadStore pages = NULL;
+		int valueCount;
+		reader = new ParquetFileReader(configuration, path,
+				readFooter.getBlocks(), schema.getColumns());
+		for (int i = 0; i <= rowGroupIndex; ++i) {
+			pages = reader.readNextRowGroup();
+		}
+		PageReader pageReader = pages.getPageReader(schema.getColumns().get(
+				columnIndex));
+		Page page = pageReader.readPage();
+		
+		/*read out all the data pages*/
+		while(page != NULL)
+		{
+			valueCount = page.getValueCount();
+			System.out.println("-------------value count of Column" + columnIndex
+					+ " in RowGroup" + rowGroupIndex + ": " + valueCount);
+			printIntFromByteArray(page.getBytes().toByteArray());
+			System.out.println("");
+			page = pageReader.readPage();
+		}
+		reader.close();
+	}
+
+	public void printIntFromByteArray(byte[] bytes) {
+		int len = bytes.length;
+
+		int number = len / 4;
+		for (int i = 0; i < number; i++) {
+			byte[] intByteRes = { bytes[i * 4], bytes[4 * i + 1],
+					bytes[4 * i + 2], bytes[4 * i + 3] };
+			System.out.print(fromByteArray(intByteRes) + "\t");
+		}
+	}
+
+	int fromByteArray(byte[] bytes) {
+		return bytes[3] << 24 | (bytes[2] & 0xFF) << 16
+				| (bytes[1] & 0xFF) << 8 | (bytes[0] & 0xFF);
+	}
+
+	public static void main(String[] args) throws Exception {
+		System.out.println("test parquet");
+
+		TestHAWQParquetOutput b = new TestHAWQParquetOutput(
+//				"/Users/bjcoe/parquet_tabledata/pa_seg1");
+				"/Users/gaod1/Perforce/gaod-hawq/gpsql/feature/hawq/cdb-pg/contrib/hawq-hadoop/hawq-mapreduce-parquet/input/simple/16399.1");
+//				"/Users/bjcoe/286730.1");//196608.1");			//required one column table
+//				"/Users/bjcoe/221199.1");//212992.1");			//required two columns table
+//				"/Users/bjcoe/221204.1");			//required three columns table
+//	"/Users/bjcoe/212997.1");			//required two columns table
+//				"/Users/bjcoe/172037.1");				//one column table
+//				"/Users/bjcoe/180224.1");		//two columns table
+	
+		/* print metadata part*/
+		b.getMetadata();
+		
+		/* print actual data*/
+		int rowGroupNo = b.readFooter.getBlocks().size();
+		for(int i = 0; i < rowGroupNo; i++)
+		{
+			BlockMetaData blockMetadata = b.readFooter.getBlocks().get(i);
+			int columnNo = blockMetadata.getColumns().size();
+			for(int j = 0; j < columnNo; j++)
+			{
+				b.getColumnDataInRowGroup(i, j);
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore b/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore
new file mode 100644
index 0000000..314002f
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore
@@ -0,0 +1,2 @@
+target/
+test-data/*/output