You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rv...@apache.org on 2015/09/19 02:36:21 UTC
[38/51] [partial] incubator-hawq git commit: SGA import
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml b/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml
new file mode 100644
index 0000000..43b49ea
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>hawq-hadoop</artifactId>
+ <groupId>com.pivotal.hawq</groupId>
+ <version>1.1.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>hawq-mapreduce-parquet</artifactId>
+ <packaging>jar</packaging>
+
+ <properties>
+ <parquet.hadoop.version>1.1.0</parquet.hadoop.version>
+ <parquet.format.version>1.0.0</parquet.format.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>sonatype-nexus-snapshots</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ <releases>
+ <enabled>false</enabled>
+ </releases>
+ <snapshots>
+ <enabled>true</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>hawq-mapreduce-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- Parquet artifacts -->
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-format</artifactId>
+ <version>${parquet.format.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${parquet.hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ <!-- snappy-java-1.0.5 raise UnsatisfiedLinkError on old system -->
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.atlassian.maven.plugins</groupId>
+ <artifactId>maven-clover2-plugin</artifactId>
+ <configuration>
+ <licenseLocation>../lib/clover.license</licenseLocation>
+ <excludes>
+ <!-- exclude file for outputformat -->
+ <exclude>**/HAWQParquetOutputFormat.java</exclude>
+ <exclude>**/HAWQSchemaConverter.java</exclude>
+ <exclude>**/HAWQWriteSupport.java</exclude>
+ <exclude>**/HAWQRecordWriter.java</exclude>
+ <!-- ignore the dummy adapter -->
+ <exclude>**/ParentValueContainerAdapter.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh
new file mode 100755
index 0000000..a8a6176
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-read-job.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+hadoop fs -rm -r $2
+
+export HADOOP_CLASSPATH=target/test-classes:target/hawq-mapreduce-parquet-1.0.0.jar:../hawq-mapreduce-common/target/hawq-mapreduce-common-1.0.0.jar:lib/parquet-column-1.3.2.jar:lib/parquet-common-1.3.2.jar:lib/parquet-encoding-1.3.2.jar:lib/parquet-hadoop-1.3.2.jar:lib/parquet-format-1.0.0.jar
+
+# enable debug
+# export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5002"
+hadoop com.pivotal.hawq.mapreduce.parquet.HAWQParquetInputDriver -conf conf/hadoop-localjob.xml $1 $2
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh
new file mode 100755
index 0000000..c32e154
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/run-write-job.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+hadoop fs -rm -r $2
+
+# TODO HAWQ only support 1.0.0 format, can we use higher version of parquet library to write file of specific version?
+
+export HADOOP_CLASSPATH=target/test-classes:target/hawq-mapreduce-parquet-1.0.0.jar:../hawq-mapreduce-common/target/hawq-mapreduce-common-1.0.0.jar:lib/parquet-column-1.0.0.jar:lib/parquet-common-1.0.0.jar:lib/parquet-encoding-1.0.0.jar:lib/parquet-hadoop-1.0.0.jar:lib/parquet-format-1.0.0.jar
+
+hadoop com.pivotal.hawq.mapreduce.parquet.HAWQParquetOutputDriver -conf conf/hadoop-localjob.xml $1 $2
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java
new file mode 100644
index 0000000..c98de65
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetInputFormat.java
@@ -0,0 +1,74 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import com.google.common.collect.Lists;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.file.HAWQFileStatus;
+import com.pivotal.hawq.mapreduce.metadata.HAWQParquetTableMetadata;
+import com.pivotal.hawq.mapreduce.parquet.support.HAWQReadSupport;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import parquet.hadoop.ParquetInputFormat;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * An InputFormat that reads input data from HAWQ Parquet table.
+ */
+public class HAWQParquetInputFormat extends ParquetInputFormat<HAWQRecord> {
+
+ private static HAWQFileStatus[] hawqFileStatuses;
+
+ public HAWQParquetInputFormat() {
+ super(HAWQReadSupport.class);
+ }
+
+ public static void setInput(Configuration conf, HAWQParquetTableMetadata metadata) {
+ hawqFileStatuses = metadata.getFileStatuses();
+ }
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext) throws IOException {
+ if (hawqFileStatuses == null) {
+ throw new IllegalStateException("Please call HAWQParquetInputFormat.setInput first!");
+ }
+
+ if (hawqFileStatuses.length == 0) {
+ return Lists.newArrayList(); // handle empty table
+ }
+
+ return super.getSplits(jobContext);
+ }
+
+ @Override
+ protected List<FileStatus> listStatus(JobContext jobContext) throws IOException {
+ List<FileStatus> result = Lists.newArrayList();
+ for (HAWQFileStatus hawqFileStatus : hawqFileStatuses) {
+ if (hawqFileStatus.getFileLength() == 0) continue; // skip empty file
+
+ Path path = new Path(hawqFileStatus.getFilePath());
+ FileSystem fs = path.getFileSystem(jobContext.getConfiguration());
+ FileStatus dfsStat = fs.getFileStatus(path);
+ // rewrite file length because HAWQ records the logicalEOF of file, which may
+ // be smaller than the file's actual EOF
+ FileStatus hawqStat = new FileStatus(
+ hawqFileStatus.getFileLength(), // rewrite to logicalEOF
+ dfsStat.isDirectory(),
+ dfsStat.getReplication(),
+ dfsStat.getBlockSize(),
+ dfsStat.getModificationTime(),
+ dfsStat.getAccessTime(),
+ dfsStat.getPermission(),
+ dfsStat.getOwner(),
+ dfsStat.getGroup(),
+ dfsStat.getPath());
+ result.add(hawqStat);
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java
new file mode 100644
index 0000000..83deb5e
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputFormat.java
@@ -0,0 +1,29 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.parquet.support.HAWQWriteSupport;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.mapreduce.Job;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.util.ContextUtil;
+
+class HAWQParquetOutputFormat extends ParquetOutputFormat<HAWQRecord> {
+
+ private static HAWQSchema hawqSchema;
+
+ public HAWQParquetOutputFormat() {
+ super(new HAWQWriteSupport());
+ }
+
+ public static void setSchema(Job job, HAWQSchema schema) {
+ hawqSchema = schema;
+ HAWQWriteSupport.setSchema(ContextUtil.getConfiguration(job), hawqSchema);
+ }
+
+ public static HAWQRecord newRecord() {
+ if (hawqSchema == null) {
+ throw new IllegalStateException("you haven't set HAWQSchema yet");
+ }
+ return new HAWQRecord(hawqSchema); // TODO reuse record?
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java
new file mode 100644
index 0000000..00964a5
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQBoxConverter.java
@@ -0,0 +1,72 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQBox;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ * required double x1;
+ * required double y1;
+ * required double x2;
+ * required double y2;
+ * }
+ * => HAWQBox
+ */
+public class HAWQBoxConverter extends GroupConverter {
+
+ private ParentValueContainer parent;
+ private Converter[] converters;
+
+ private double x1;
+ private double y1;
+ private double x2;
+ private double y2;
+
+ public HAWQBoxConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ this.converters = new Converter[4];
+ this.converters[0] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ x1 = value;
+ }
+ };
+ this.converters[1] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ y1 = value;
+ }
+ };
+ this.converters[2] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ x2 = value;
+ }
+ };
+ this.converters[3] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ y2 = value;
+ }
+ };
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return this.converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void end() {
+ try {
+ HAWQBox box = new HAWQBox(x1, y1, x2, y2);
+ parent.setBox(box);
+ } catch (HAWQException e) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java
new file mode 100644
index 0000000..f7883b6
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQCircleConverter.java
@@ -0,0 +1,64 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQCircle;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ * required double x;
+ * required double y;
+ * required double r;
+ * }
+ * => HAWQCircle
+ */
+public class HAWQCircleConverter extends GroupConverter {
+
+ private ParentValueContainer parent;
+ private Converter[] converters;
+
+ private double x;
+ private double y;
+ private double r;
+
+ public HAWQCircleConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ this.converters = new Converter[3];
+ this.converters[0] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ x = value;
+ }
+ };
+ this.converters[1] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ y = value;
+ }
+ };
+ this.converters[2] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ r = value;
+ }
+ };
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return this.converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void end() {
+ try {
+ HAWQCircle circle = new HAWQCircle(x, y, r);
+ parent.setCircle(circle);
+ } catch (HAWQException e) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java
new file mode 100644
index 0000000..7ad2c34
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQLineSegmentConverter.java
@@ -0,0 +1,72 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQLseg;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ * required double x1;
+ * required double y1;
+ * required double x2;
+ * required double y2;
+ * }
+ * => HAWQLseg
+ */
+public class HAWQLineSegmentConverter extends GroupConverter {
+
+ private ParentValueContainer parent;
+ private Converter[] converters;
+
+ private double x1;
+ private double y1;
+ private double x2;
+ private double y2;
+
+ public HAWQLineSegmentConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ this.converters = new Converter[4];
+ this.converters[0] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ x1 = value;
+ }
+ };
+ this.converters[1] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ y1 = value;
+ }
+ };
+ this.converters[2] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ x2 = value;
+ }
+ };
+ this.converters[3] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ y2 = value;
+ }
+ };
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return this.converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void end() {
+ try {
+ HAWQLseg lseg = new HAWQLseg(x1, y1, x2, y2);
+ parent.setLseg(lseg);
+ } catch (HAWQException e) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java
new file mode 100644
index 0000000..bbcdb5f
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPathConverter.java
@@ -0,0 +1,66 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPath;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPoint;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * group {
+ * required boolean is_open;
+ * repeated group {
+ * required double x;
+ * required double y;
+ * }
+ * }
+ * => HAWQPath
+ */
+public class HAWQPathConverter extends GroupConverter {
+
+ private ParentValueContainer parent;
+ private Converter[] converters;
+
+ private boolean isOpen;
+ private List<HAWQPoint> points;
+
+ public HAWQPathConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ this.points = new ArrayList<HAWQPoint>();
+ this.converters = new Converter[2];
+
+ this.converters[0] = new HAWQRecordConverter.HAWQPrimitiveConverter(new ParentValueContainerAdapter() {
+ @Override
+ public void setBoolean(boolean x) throws HAWQException {
+ HAWQPathConverter.this.isOpen = x;
+ }
+ });
+ this.converters[1] = new HAWQPointConverter(new ParentValueContainerAdapter() {
+ @Override
+ public void setPoint(HAWQPoint x) throws HAWQException {
+ HAWQPathConverter.this.points.add(x);
+ }
+ });
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return this.converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ points.clear();
+ }
+
+ @Override
+ public void end() {
+ try {
+ HAWQPath path = new HAWQPath(isOpen, points);
+ parent.setPath(path);
+ } catch (HAWQException e) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java
new file mode 100644
index 0000000..20e2884
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPointConverter.java
@@ -0,0 +1,56 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPoint;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ * group {
+ * required double x;
+ * required double y;
+ * }
+ * => HAWQPoint
+ */
+public class HAWQPointConverter extends GroupConverter {
+
+ private ParentValueContainer parent;
+ private Converter[] converters;
+
+ private double x;
+ private double y;
+
+ public HAWQPointConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ this.converters = new Converter[2];
+ this.converters[0] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ x = value;
+ }
+ };
+ this.converters[1] = new PrimitiveConverter() {
+ @Override
+ public void addDouble(double value) {
+ y = value;
+ }
+ };
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return this.converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void end() {
+ try {
+ HAWQPoint point = new HAWQPoint(x, y);
+ parent.setPoint(point);
+ } catch (HAWQException e) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java
new file mode 100644
index 0000000..dbec017
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQPolygonConverter.java
@@ -0,0 +1,74 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.datatype.HAWQBox;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPoint;
+import com.pivotal.hawq.mapreduce.datatype.HAWQPolygon;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * group {
+ * required group boundbox {
+ * required double x1;
+ * required double y1;
+ * required double x2;
+ * required double y2;
+ * },
+ * repeated group points {
+ * required double x;
+ * required double y;
+ * }
+ * }
+ * => HAWQPolygon
+ */
+public class HAWQPolygonConverter extends GroupConverter {
+
+ private ParentValueContainer parent;
+ private Converter[] converters;
+
+ private HAWQBox boundbox;
+ private List<HAWQPoint> points;
+
+ public HAWQPolygonConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ this.converters = new Converter[2];
+ this.points = new ArrayList<HAWQPoint>();
+
+ this.converters[0] = new HAWQBoxConverter(new ParentValueContainerAdapter() {
+ @Override
+ public void setBox(HAWQBox x) throws HAWQException {
+ HAWQPolygonConverter.this.boundbox = x;
+ }
+ });
+
+ this.converters[1] = new HAWQPointConverter(new ParentValueContainerAdapter() {
+ @Override
+ public void setPoint(HAWQPoint x) throws HAWQException {
+ HAWQPolygonConverter.this.points.add(x);
+ }
+ });
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return this.converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ this.boundbox = null;
+ this.points.clear();
+ }
+
+ @Override
+ public void end() {
+ try {
+ HAWQPolygon polygon = new HAWQPolygon(points, boundbox);
+ parent.setPolygon(polygon);
+ } catch (HAWQException e) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java
new file mode 100644
index 0000000..96df594
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordConverter.java
@@ -0,0 +1,552 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.datatype.*;
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import com.pivotal.hawq.mapreduce.util.HAWQConvertUtil;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.MessageType;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+/**
+ * HAWQ's implementation of Parquet's GroupConverter.
+ */
+public class HAWQRecordConverter extends GroupConverter {
+
+ private final ParentValueContainer parent;
+ private HAWQSchema hawqSchema;
+ private final Converter[] converters;
+ private HAWQRecord currentRecord;
+
+ // TODO maybe HAWQRecordConverter(HAWQSchema requestedSchema, HAWQSchema hawqSchema) ?
+ public HAWQRecordConverter(MessageType requestedSchema, HAWQSchema hawqSchema) {
+ this(null, requestedSchema, hawqSchema);
+ }
+
+ public HAWQRecordConverter(ParentValueContainer parent, MessageType requestedSchema, HAWQSchema hawqSchema) {
+ this.parent = parent;
+ this.hawqSchema = hawqSchema;
+
+ int fieldsNum = hawqSchema.getFieldCount();
+ this.converters = new Converter[fieldsNum];
+
+ int fieldIndex = 0; // index of converter starts from 0
+ for (HAWQField field : hawqSchema.getFields()) {
+ final int recordFieldIndex = fieldIndex + 1; // index in HAWQRecord starts from 1
+ Converter fieldConverter = newConverter(field, new ParentValueContainer() {
+ @Override
+ public void setBoolean(boolean x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setBoolean(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setBit(HAWQVarbit x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setBit(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setByte(byte x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setByte(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setBytes(byte[] x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setBytes(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setShort(short x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setShort(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setInt(int x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setInt(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setLong(long x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setLong(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setFloat(float x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setFloat(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setDouble(double x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setDouble(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setBigDecimal(BigDecimal x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setBigDecimal(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setString(String x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setString(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setDate(Date x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setDate(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setTime(Time x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setTime(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setTimestamp(Timestamp x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setTimestamp(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setInterval(HAWQInterval x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setInterval(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setPoint(HAWQPoint x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setPoint(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setLseg(HAWQLseg x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setLseg(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setBox(HAWQBox x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setBox(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setCircle(HAWQCircle x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setCircle(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setPath(HAWQPath x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setPath(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setPolygon(HAWQPolygon x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setPolygon(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setMacaddr(HAWQMacaddr x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setMacaddr(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setInet(HAWQInet x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setInet(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setCidr(HAWQCidr x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setCidr(recordFieldIndex, x);
+ }
+
+ @Override
+ public void setArray(Array x) throws HAWQException {
+ throw new UnsupportedOperationException(); // TODO
+ }
+
+ @Override
+ public void setField(HAWQRecord x) throws HAWQException {
+ HAWQRecordConverter.this.currentRecord.setField(recordFieldIndex, x);
+ }
+ });
+ this.converters[fieldIndex++] = fieldConverter;
+ }
+ }
+
+ private Converter newConverter(HAWQField hawqType, ParentValueContainer parent) {
+ if (!hawqType.isPrimitive()) // FIXME
+ throw new RuntimeException("HAWQRecordConverter.newConverter not implement group type converter");
+
+ switch (hawqType.asPrimitive().getType()) {
+ case BIT:case VARBIT:
+ return new HAWQBitsConverter(parent);
+ case BYTEA:
+ return new HAWQByteArrayConverter(parent);
+ /* number related type */
+ case BOOL:case INT4:case INT8:case FLOAT4:case FLOAT8:
+ return new HAWQPrimitiveConverter(parent);
+ case INT2:
+ return new HAWQShortConverter(parent);
+ case NUMERIC:
+ return new HAWQBigDecimalConverter(parent);
+ /* string related type */
+ case BPCHAR:case VARCHAR:case TEXT:case XML:
+ return new HAWQStringConverter(parent);
+ /* time related type */
+ case DATE:
+ return new HAWQDateConverter(parent);
+ case TIME:
+ return new HAWQTimeConverter(parent);
+ case TIMETZ:
+ return new HAWQTimeTZConverter(parent);
+ case TIMESTAMP:
+ return new HAWQTimestampConverter(parent);
+ case TIMESTAMPTZ:
+ return new HAWQTimestampTZConverter(parent);
+ case INTERVAL:
+ return new HAWQIntervalConverter(parent);
+ /* geometry related type */
+ case POINT:
+ return new HAWQPointConverter(parent);
+ case LSEG:
+ return new HAWQLineSegmentConverter(parent);
+ case PATH:
+ return new HAWQPathConverter(parent);
+ case BOX:
+ return new HAWQBoxConverter(parent);
+ case POLYGON:
+ return new HAWQPolygonConverter(parent);
+ case CIRCLE:
+ return new HAWQCircleConverter(parent);
+ /* other type */
+ case MACADDR:
+ return new HAWQMacaddrConverter(parent);
+ case INET:
+ return new HAWQInetConverter(parent);
+ case CIDR:
+ return new HAWQCidrConverter(parent);
+ default:
+ throw new UnsupportedOperationException("unsupported type " + hawqType);
+ }
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return converters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ currentRecord = new HAWQRecord(hawqSchema);
+ }
+
+ @Override
+ public void end() {
+ if (parent != null) {
+ try {
+ parent.setField(currentRecord);
+ } catch (HAWQException e) {}
+ }
+ }
+
+ public HAWQRecord getCurrentRecord() {
+ return currentRecord;
+ }
+
+ //////////////////////////////////////////////////////////////
+ /// converters from parquet data type to HAWQ data type
+ //////////////////////////////////////////////////////////////
+
+ static class HAWQPrimitiveConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQPrimitiveConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ try {
+ parent.setBoolean(value);
+ } catch (HAWQException e) {}
+ }
+
+ @Override
+ public void addInt(int value) {
+ try {
+ parent.setInt(value);
+ } catch (HAWQException e) {}
+ }
+
+ @Override
+ public void addLong(long value) {
+ try {
+ parent.setLong(value);
+ } catch (HAWQException e) {}
+ }
+
+ @Override
+ public void addFloat(float value) {
+ try {
+ parent.setFloat(value);
+ } catch (HAWQException e) {}
+ }
+
+ @Override
+ public void addDouble(double value) {
+ try {
+ parent.setDouble(value);
+ } catch (HAWQException e) {}
+ }
+ }
+
+ static class HAWQShortConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQShortConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addInt(int value) {
+ try {
+ parent.setShort((short) value);
+ } catch (HAWQException e) {}
+ }
+ }
+
+ static class HAWQBigDecimalConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQBigDecimalConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ // FIXME bytesToDecimal return "NAN" case
+ parent.setBigDecimal((BigDecimal) HAWQConvertUtil.bytesToDecimal(value.getBytes(), 0));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to BigDecimal", e);
+ }
+ }
+ }
+
+ static class HAWQStringConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQStringConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setString(value.toStringUsingUTF8());
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to String", e);
+ }
+ }
+ }
+
+ static class HAWQBitsConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQBitsConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setBit(HAWQConvertUtil.bytesToVarbit(value.getBytes(), 0));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to Varbit");
+ }
+ }
+ }
+
+ static class HAWQByteArrayConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQByteArrayConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setBytes(value.getBytes());
+ } catch (HAWQException e) {}
+ }
+ }
+
+ // date is stored as a 4-bytes int
+ static class HAWQDateConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQDateConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addInt(int value) {
+ try {
+ parent.setDate(HAWQConvertUtil.toDate(value));
+
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Integer to Date", e);
+ }
+ }
+ }
+
+ // time (without timezone) is stored in 8-bytes long
+ static class HAWQTimeConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQTimeConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addLong(long value) {
+ try {
+ parent.setTime(HAWQConvertUtil.toTime(value));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Long to Time", e);
+ }
+ }
+ }
+
+ // time (with timezone) is stored in 12 bytes binary
+ static class HAWQTimeTZConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQTimeTZConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setTime(HAWQConvertUtil.toTimeTz(value.getBytes(), 0));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to Time (with timezone)", e);
+ }
+ }
+ }
+
+ // timestamp (without timezone) is stored in 8-bytes long
+ static class HAWQTimestampConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQTimestampConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addLong(long value) {
+ try {
+ parent.setTimestamp(HAWQConvertUtil.toTimestamp(value, false));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Long to Timestamp", e);
+ }
+ }
+ }
+
+ // timestamp (with timezone) is stored in 8-bytes long
+ static class HAWQTimestampTZConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQTimestampTZConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addLong(long value) {
+ try {
+ parent.setTimestamp(HAWQConvertUtil.toTimestamp(value, true));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Long to Timestamp (with timezone)", e);
+ }
+ }
+ }
+
+ // interval is stored in 16 bytes binary
+ static class HAWQIntervalConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQIntervalConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setInterval(HAWQConvertUtil.bytesToInterval(value.getBytes(), 0));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to Interval", e);
+ }
+ }
+ }
+
+ // macaddr is stored as binary
+ static class HAWQMacaddrConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ public HAWQMacaddrConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setMacaddr(new HAWQMacaddr(value.getBytes()));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to HAWQMacaddr");
+ }
+ }
+ }
+
+ // inet is stored as binary
+ static class HAWQInetConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ HAWQInetConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setInet(HAWQConvertUtil.bytesToInet(value.getBytes(), 0));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to HAWQInet");
+ }
+ }
+ }
+
+ // cidr is stored as binary
+ static class HAWQCidrConverter extends PrimitiveConverter {
+ private ParentValueContainer parent;
+
+ HAWQCidrConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBinary(Binary value) {
+ try {
+ parent.setCidr(HAWQConvertUtil.bytesToCidr(value.getBytes(), 0));
+ } catch (HAWQException e) {
+ throw new RuntimeException("error during conversion from Binary to HAWQCidr");
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java
new file mode 100644
index 0000000..fd7000f
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordMaterializer.java
@@ -0,0 +1,33 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+/**
+ * HAWQ's implementation of the RecordMaterializer interface.
+ *
+ * <p>This class materialize HAWQRecord objects from a stream of Parquet data,
+ * using a HAWQRecordConverter internally to convert data types between HAWQ and Parquet.
+ *
+ */
+public class HAWQRecordMaterializer extends RecordMaterializer<HAWQRecord> {
+
+ private HAWQRecordConverter rootConverter;
+
+ public HAWQRecordMaterializer(MessageType requestedSchema, HAWQSchema hawqSchema) {
+ rootConverter = new HAWQRecordConverter(requestedSchema, hawqSchema);
+ }
+
+ @Override
+ public HAWQRecord getCurrentRecord() {
+ return rootConverter.getCurrentRecord();
+ }
+
+ @Override
+ public GroupConverter getRootConverter() {
+ return rootConverter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java
new file mode 100644
index 0000000..cb38157
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/HAWQRecordWriter.java
@@ -0,0 +1,93 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQGroupField;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+
+/**
+ * Convert HAWQRecord to Parquet Structure, writes to RecordConsumer
+ * User: gaod1
+ * Date: 9/10/13
+ */
+public class HAWQRecordWriter {
+
+ private RecordConsumer consumer;
+ private HAWQSchema schema;
+
+ public HAWQRecordWriter(RecordConsumer consumer, HAWQSchema schema) {
+ this.consumer = consumer;
+ this.schema = schema;
+ }
+
+ public void writeRecord(HAWQRecord record) throws HAWQException {
+ consumer.startMessage();
+ for (int i = 1; i <= schema.getFieldCount(); i++) {
+ writeField(schema.getField(i), i, record.getObject(i));
+ }
+ consumer.endMessage();
+ }
+
+ private void writeField(HAWQField fieldSchema, int fieldIndex, Object value) throws HAWQException {
+ if (value == null) {
+ if (fieldSchema.isOptional()) return;
+ else throw new HAWQException("missing value for required field " + fieldSchema.getName());
+ }
+
+ consumer.startField(fieldSchema.getName(), fieldIndex - 1);
+ if (fieldSchema.isPrimitive()) {
+ writePrimitive(fieldSchema.asPrimitive(), value);
+ } else {
+ writeGroup(fieldSchema.asGroup(), (HAWQRecord) value);
+ }
+ consumer.endField(fieldSchema.getName(), fieldIndex - 1);
+ }
+
+ private void writeGroup(HAWQGroupField groupFieldSchema, HAWQRecord value) throws HAWQException {
+ consumer.startGroup();
+ for (int i = 1; i <= groupFieldSchema.getFieldCount(); i++) {
+ writeField(groupFieldSchema.getField(i), i, value.getObject(i));
+ }
+ consumer.endGroup();
+ }
+
+ private void writePrimitive(HAWQPrimitiveField primitiveFieldSchema, Object value) {
+ // TODO
+ switch (primitiveFieldSchema.getType()) {
+ case BOOL:
+ consumer.addBoolean((Boolean) value);
+ break;
+ case BYTEA:
+ consumer.addBinary(Binary.fromByteArray((byte[]) value));
+ break;
+ case INT2:
+ consumer.addInteger(((Short) value).intValue());
+ break;
+ case INT4:
+ consumer.addInteger((Integer) value);
+ break;
+ case INT8:
+ consumer.addLong((Long) value);
+ break;
+ case FLOAT4:
+ consumer.addFloat((Float) value);
+ break;
+ case FLOAT8:
+ consumer.addDouble((Double) value);
+ break;
+ case VARCHAR:case TEXT:
+ consumer.addBinary(Binary.fromString((String) value));
+ break;
+ case DATE:
+ break;
+ case TIME:
+ break;
+ default:
+ throw new RuntimeException("unsupported type in HAWQRecordWriter");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java
new file mode 100644
index 0000000..b5363e6
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainer.java
@@ -0,0 +1,83 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.datatype.*;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public interface ParentValueContainer {
+
+ ///////////////////////////////////////////////////////////////////////
+ // BOOL, BIT, VARBIT, BYTEA, INT2, INT4, INT8, FLOAT4, FLOAT8, NUMERIC
+ ///////////////////////////////////////////////////////////////////////
+
+ void setBoolean(boolean x) throws HAWQException;
+
+ void setBit(HAWQVarbit x) throws HAWQException;
+
+ void setByte(byte x) throws HAWQException;
+
+ void setBytes(byte[] x) throws HAWQException;
+
+ void setShort(short x) throws HAWQException;
+
+ void setInt(int x) throws HAWQException;
+
+ void setLong(long x) throws HAWQException;
+
+ void setFloat(float x) throws HAWQException;
+
+ void setDouble(double x) throws HAWQException;
+
+ void setBigDecimal(BigDecimal x) throws HAWQException;
+
+ ///////////////////////////////////////////////////////////////////////
+ // CHAR, BPCHAR, VARCHAR, TEXT, DATE, TIME, TIMETZ, TIMESTAMP, TIMESTAMPTZ, INTERVAL
+ ///////////////////////////////////////////////////////////////////////
+
+ void setString(String x) throws HAWQException;
+
+ void setDate(Date x) throws HAWQException;
+
+ void setTime(Time x) throws HAWQException;
+
+ void setTimestamp(Timestamp x) throws HAWQException;
+
+ void setInterval(HAWQInterval x) throws HAWQException;
+
+ ///////////////////////////////////////////////////////////////////////
+ // POINT, LSEG, BOX, CIRCLE, PATH, POLYGON, MACADDR, INET, CIDR, XML
+ ///////////////////////////////////////////////////////////////////////
+
+ void setPoint(HAWQPoint x) throws HAWQException;
+
+ void setLseg(HAWQLseg x) throws HAWQException;
+
+ void setBox(HAWQBox x) throws HAWQException;
+
+ void setCircle(HAWQCircle x) throws HAWQException;
+
+ void setPath(HAWQPath x) throws HAWQException;
+
+ void setPolygon(HAWQPolygon x) throws HAWQException;
+
+ void setMacaddr(HAWQMacaddr x) throws HAWQException;
+
+ void setInet(HAWQInet x) throws HAWQException;
+
+ void setCidr(HAWQCidr x) throws HAWQException;
+
+ ///////////////////////////////////////////////////////////////////////
+ // other
+ ///////////////////////////////////////////////////////////////////////
+
+ void setArray(Array x) throws HAWQException;
+
+ void setField(HAWQRecord x) throws HAWQException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java
new file mode 100644
index 0000000..3ae4ac1
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/convert/ParentValueContainerAdapter.java
@@ -0,0 +1,143 @@
+package com.pivotal.hawq.mapreduce.parquet.convert;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.datatype.*;
+
+import java.math.BigDecimal;
+import java.sql.Array;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+
+public abstract class ParentValueContainerAdapter implements ParentValueContainer {
+ @Override
+ public void setBoolean(boolean x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBit(HAWQVarbit x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setByte(byte x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBytes(byte[] x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setShort(short x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setInt(int x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setLong(long x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setFloat(float x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDouble(double x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBigDecimal(BigDecimal x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setString(String x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setDate(Date x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setTime(Time x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setTimestamp(Timestamp x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setInterval(HAWQInterval x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setPoint(HAWQPoint x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setLseg(HAWQLseg x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setBox(HAWQBox x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCircle(HAWQCircle x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setPath(HAWQPath x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setPolygon(HAWQPolygon x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setMacaddr(HAWQMacaddr x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setInet(HAWQInet x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setCidr(HAWQCidr x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setArray(Array x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setField(HAWQRecord x) throws HAWQException {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java
new file mode 100644
index 0000000..bdeb63e
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQReadSupport.java
@@ -0,0 +1,50 @@
+package com.pivotal.hawq.mapreduce.parquet.support;
+
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.parquet.convert.HAWQRecordMaterializer;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.conf.Configuration;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import java.util.Map;
+
+/**
+ * User: gaod1
+ * Date: 8/8/13
+ */
+public class HAWQReadSupport extends ReadSupport<HAWQRecord> {
+
+ private static final String KEY_HAWQ_SCHEMA = "hawq.schema";
+ private static final String HAWQ_REQUESTED_SCHEMA = "hawq.schema.requested";
+
+ @Override
+ public ReadContext init(Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema) {
+
+// String requestedProjectionString = configuration.get(HAWQ_REQUESTED_SCHEMA);
+//
+// if (requestedProjectionString == null) { // read all data
+// return new ReadContext(fileSchema);
+// }
+//
+// HAWQSchema requestedHAWQSchema = HAWQSchema.fromString(requestedProjectionString);
+// MessageType requestedSchema = HAWQSchemaConverter.convertToParquet(requestedHAWQSchema);
+// return new ReadContext(requestedSchema);
+
+ return new ReadContext(fileSchema);
+ }
+
+ @Override
+ public RecordMaterializer<HAWQRecord> prepareForRead(Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema, ReadContext readContext) {
+
+ HAWQSchema hawqSchema = HAWQSchema.fromString(keyValueMetaData.get(KEY_HAWQ_SCHEMA));
+ return new HAWQRecordMaterializer(
+ readContext.getRequestedSchema(), // requested parquet schema
+ hawqSchema); // corresponding requested HAWQSchema
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java
new file mode 100644
index 0000000..6248075
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/support/HAWQWriteSupport.java
@@ -0,0 +1,57 @@
+package com.pivotal.hawq.mapreduce.parquet.support;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.parquet.convert.HAWQRecordWriter;
+import com.pivotal.hawq.mapreduce.parquet.util.HAWQSchemaConverter;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.conf.Configuration;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * User: gaod1
+ * Date: 9/10/13
+ */
+public class HAWQWriteSupport extends WriteSupport<HAWQRecord> {
+
+ // key of HAWQ Schema in extraKeyValue of HAWQ's parquet file
+ private static final String HAWQ_SCHEMA_KEY = "hawq.schema";
+
+ private HAWQSchema hawqSchema;
+ private MessageType parquetSchema;
+ private HAWQRecordWriter recordWriter;
+
+ public static void setSchema(Configuration configuration, HAWQSchema hawqSchema) {
+ configuration.set("parquet.hawq.schema", hawqSchema.toString());
+ }
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ hawqSchema = HAWQSchema.fromString(configuration.get("parquet.hawq.schema"));
+ parquetSchema = HAWQSchemaConverter.convertToParquet(hawqSchema);
+
+ Map<String, String> extraMetaData = new HashMap<String, String>();
+ extraMetaData.put(HAWQ_SCHEMA_KEY, hawqSchema.toString());
+
+ return new WriteContext(parquetSchema, extraMetaData);
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ recordWriter = new HAWQRecordWriter(recordConsumer, hawqSchema);
+ }
+
+ @Override
+ public void write(HAWQRecord record) {
+ try {
+ recordWriter.writeRecord(record);
+ } catch (HAWQException e) {
+ throw new RuntimeException("failed to write record", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java
new file mode 100644
index 0000000..73a96b9
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/main/java/com/pivotal/hawq/mapreduce/parquet/util/HAWQSchemaConverter.java
@@ -0,0 +1,76 @@
+package com.pivotal.hawq.mapreduce.parquet.util;
+
+import com.pivotal.hawq.mapreduce.schema.HAWQField;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * User: gaod1
+ * Date: 9/10/13
+ */
+public final class HAWQSchemaConverter {
+
+ /**
+ * Convert from HAWQ schema to Parquet schema.
+ *
+ * @param hawqSchema schema to be converted.
+ * @return corresponding Parquet schema.
+ */
+ public static MessageType convertToParquet(HAWQSchema hawqSchema) {
+ List<Type> parquetFields = new ArrayList<Type>();
+ for (HAWQField hawqField : hawqSchema.getFields()) {
+ parquetFields.add(convertField(hawqField.asPrimitive()));
+ }
+ return new MessageType(hawqSchema.getName(), parquetFields);
+ }
+
+ private static Type convertField(HAWQPrimitiveField hawqField) {
+ // FIXME do not consider UDT
+ String name = hawqField.getName();
+ Repetition repetition = getRepetition(hawqField);
+ switch (hawqField.getType()) {
+ case BOOL:
+ return new PrimitiveType(repetition, PrimitiveTypeName.BOOLEAN, name);
+ case BYTEA:
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+ case INT2:case INT4:
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+ case INT8:
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ case FLOAT4:
+ return new PrimitiveType(repetition, PrimitiveTypeName.FLOAT, name);
+ case FLOAT8:
+ return new PrimitiveType(repetition, PrimitiveTypeName.DOUBLE, name);
+ case VARCHAR:
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name, OriginalType.UTF8);
+ /* time-related type */
+ case DATE:
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+ case TIME:
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ case TIMETZ:
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+ case TIMESTAMP:
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ case TIMESTAMPTZ:
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ case INTERVAL:
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+ default:
+ throw new RuntimeException("unsupported hawq type: " + hawqField.getType().name());
+ }
+ }
+
+ private static Repetition getRepetition(HAWQField field) {
+ return field.isOptional() ? Repetition.OPTIONAL : Repetition.REQUIRED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java
new file mode 100644
index 0000000..245aba7
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/HAWQParquetOutputDriver.java
@@ -0,0 +1,189 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import com.pivotal.hawq.mapreduce.HAWQException;
+import com.pivotal.hawq.mapreduce.HAWQRecord;
+import com.pivotal.hawq.mapreduce.schema.HAWQPrimitiveField;
+import com.pivotal.hawq.mapreduce.schema.HAWQSchema;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.io.IOException;
+
+/**
+ * Job for writing HAWQ parquet file
+ * User: gaod1
+ * Date: 9/16/13
+ */
+public class HAWQParquetOutputDriver extends Configured implements Tool {
+
+ public static void main(String[] args) throws Exception {
+ int exitCode = ToolRunner.run(new HAWQParquetOutputDriver(), args);
+ System.exit(exitCode);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Job job = new Job(getConf(), "HAWQParquetOutputFormat");
+ job.setJarByClass(HAWQParquetOutputDriver.class);
+
+ job.setOutputFormatClass(HAWQParquetOutputFormat.class);
+
+ /*
+ // int2 int4 int8
+ HAWQSchema schema = new HAWQSchema("t_int",
+ HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.INT2, "col_short"),
+ HAWQSchema.optional_field(HAWQPrimitiveField.PrimitiveType.INT4, "col_int"),
+ HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.INT8, "col_long")
+ );
+ job.setMapperClass(WriteIntMapper.class);
+ */
+
+ /*
+ // varchar
+ HAWQSchema schema = new HAWQSchema("t_varchar",
+ HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.VARCHAR, "col_varchar")
+ );
+ job.setMapperClass(WriteVarcharMapper.class);
+ */
+
+ /*
+ // float4 float8
+ HAWQSchema schema = new HAWQSchema("t_floating",
+ HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.FLOAT4, "col_float"),
+ HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.FLOAT8, "col_long")
+ );
+ job.setMapperClass(WriteFloatingNumberMapper.class);
+ */
+
+ // boolean
+// HAWQSchema schema = new HAWQSchema("t_boolean",
+// HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.BOOL, "col_bool"));
+// job.setMapperClass(WriteBooleanMapper.class);
+
+ // byte array
+ HAWQSchema schema = new HAWQSchema("t_bytea",
+ HAWQSchema.required_field(HAWQPrimitiveField.PrimitiveType.BYTEA, "col_bytea"));
+ job.setMapperClass(WriteByteArrayMapper.class);
+
+ HAWQParquetOutputFormat.setSchema(job, schema);
+
+
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ HAWQParquetOutputFormat.setOutputPath(job, new Path(args[1]));
+
+ job.setNumReduceTasks(0);
+
+ job.setMapOutputKeyClass(Void.class);
+ job.setMapOutputValueClass(HAWQRecord.class);
+
+ return job.waitForCompletion(true) ? 0 : 1;
+ }
+
+ private static class WriteIntMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+
+ private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ Integer recordNum = Integer.parseInt(value.toString());
+ try {
+ for (int i = 0; i < recordNum; i++) {
+ record.reset();
+ record.setShort(1, (short) (i + 1));
+ if (i % 2 == 0) {
+ record.setInt(2, i);
+ }
+ record.setLong(3, i * 100);
+ context.write(null, record);
+ }
+
+ } catch (HAWQException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private static class WriteVarcharMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+ private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ Integer recordNum = Integer.parseInt(value.toString());
+ try {
+ for (int i = 0; i < recordNum; i++) {
+ record.reset();
+ record.setString(1, "hello" + i);
+ context.write(null, record);
+ }
+
+ } catch (HAWQException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private static class WriteFloatingNumberMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+ private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ Integer recordNum = Integer.parseInt(value.toString());
+ try {
+ for (int i = 0; i < recordNum; i++) {
+ record.reset();
+ record.setFloat(1, 1.0f * i);
+ record.setDouble(2, 2 * Math.PI * i);
+ context.write(null, record);
+ }
+
+ } catch (HAWQException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private static class WriteBooleanMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+ private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ Integer recordNum = Integer.parseInt(value.toString());
+ try {
+ for (int i = 0; i < recordNum; i++) {
+ record.reset();
+ record.setBoolean(1, i % 2 == 0);
+ context.write(null, record);
+ }
+
+ } catch (HAWQException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
+ private static class WriteByteArrayMapper extends Mapper<LongWritable, Text, Void, HAWQRecord> {
+ private HAWQRecord record = HAWQParquetOutputFormat.newRecord();
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ Integer recordNum = Integer.parseInt(value.toString());
+ try {
+ for (int i = 0; i < recordNum; i++) {
+ record.reset();
+ record.setBytes(1, String.format("hello %d", i).getBytes());
+ context.write(null, record);
+ }
+
+ } catch (HAWQException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java
new file mode 100644
index 0000000..1947534
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-parquet/src/test/java/com/pivotal/hawq/mapreduce/parquet/TestHAWQParquetOutput.java
@@ -0,0 +1,142 @@
+package com.pivotal.hawq.mapreduce.parquet;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import parquet.column.page.Page;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+
+public class TestHAWQParquetOutput {
+ private static final PageReadStore NULL = null;
+ private ParquetMetadata readFooter;
+ private ParquetFileReader reader;
+ private Configuration configuration;
+ private Path path;
+ private MessageType schema;
+
+ public TestHAWQParquetOutput(String filepath) throws Exception {
+ File parquetFile = new File(filepath).getAbsoluteFile();
+ path = new Path(parquetFile.toURI());
+ configuration = new Configuration();
+ readFooter = ParquetFileReader.readFooter(configuration, path);
+ schema = readFooter.getFileMetaData().getSchema();
+ }
+
+ public int getRowGroupCount() {
+ int count = readFooter.getBlocks().size();
+ System.out.println("-------------RowGroup Count: " + count);
+ return count;
+ }
+
+ public long getRowCount(int rowGroupIndex) {
+ long count = readFooter.getBlocks().get(rowGroupIndex).getRowCount();
+ System.out.println("-------------Row Count in RowGroup" + rowGroupIndex
+ + ": " + count);
+ return count;
+ }
+
+ public int getColumnCountInRowGroup(int rowGroupIndex) {
+ int count = readFooter.getBlocks().get(rowGroupIndex).getColumns()
+ .size();
+ System.out.println("-------------Column Count in RowGroup"
+ + rowGroupIndex + ": " + count);
+ return count;
+ }
+
+ public void getColumnType(int rowGroupIndex, int columnIndex) {
+ String type = readFooter.getBlocks().get(rowGroupIndex).getColumns()
+ .get(columnIndex).getType().toString();
+ System.out.println("-------------Type of Column" + columnIndex
+ + " in RowGroup" + rowGroupIndex + ": " + type);
+ }
+
+ public void getColumnCodec(int rowGroupIndex, int columnIndex) {
+ String Codec = readFooter.getBlocks().get(rowGroupIndex).getColumns()
+ .get(columnIndex).getCodec().toString();
+ System.out.println("-------------Codec of Column" + columnIndex
+ + " in RowGroup" + rowGroupIndex + ": " + Codec);
+ }
+
+ public void getMetadata() {
+ String metadata = readFooter.toString();
+ System.out.println(metadata);
+ }
+
+ public void getColumnDataInRowGroup(int rowGroupIndex, int columnIndex)
+ throws Exception {
+ PageReadStore pages = NULL;
+ int valueCount;
+ reader = new ParquetFileReader(configuration, path,
+ readFooter.getBlocks(), schema.getColumns());
+ for (int i = 0; i <= rowGroupIndex; ++i) {
+ pages = reader.readNextRowGroup();
+ }
+ PageReader pageReader = pages.getPageReader(schema.getColumns().get(
+ columnIndex));
+ Page page = pageReader.readPage();
+
+ /*read out all the data pages*/
+ while(page != NULL)
+ {
+ valueCount = page.getValueCount();
+ System.out.println("-------------value count of Column" + columnIndex
+ + " in RowGroup" + rowGroupIndex + ": " + valueCount);
+ printIntFromByteArray(page.getBytes().toByteArray());
+ System.out.println("");
+ page = pageReader.readPage();
+ }
+ reader.close();
+ }
+
+ public void printIntFromByteArray(byte[] bytes) {
+ int len = bytes.length;
+
+ int number = len / 4;
+ for (int i = 0; i < number; i++) {
+ byte[] intByteRes = { bytes[i * 4], bytes[4 * i + 1],
+ bytes[4 * i + 2], bytes[4 * i + 3] };
+ System.out.print(fromByteArray(intByteRes) + "\t");
+ }
+ }
+
+ int fromByteArray(byte[] bytes) {
+ return bytes[3] << 24 | (bytes[2] & 0xFF) << 16
+ | (bytes[1] & 0xFF) << 8 | (bytes[0] & 0xFF);
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.out.println("test parquet");
+
+ TestHAWQParquetOutput b = new TestHAWQParquetOutput(
+// "/Users/bjcoe/parquet_tabledata/pa_seg1");
+ "/Users/gaod1/Perforce/gaod-hawq/gpsql/feature/hawq/cdb-pg/contrib/hawq-hadoop/hawq-mapreduce-parquet/input/simple/16399.1");
+// "/Users/bjcoe/286730.1");//196608.1"); //required one column table
+// "/Users/bjcoe/221199.1");//212992.1"); //required two columns table
+// "/Users/bjcoe/221204.1"); //required three columns table
+// "/Users/bjcoe/212997.1"); //required two columns table
+// "/Users/bjcoe/172037.1"); //one column table
+// "/Users/bjcoe/180224.1"); //two columns table
+
+ /* print metadata part*/
+ b.getMetadata();
+
+ /* print actual data*/
+ int rowGroupNo = b.readFooter.getBlocks().size();
+ for(int i = 0; i < rowGroupNo; i++)
+ {
+ BlockMetaData blockMetadata = b.readFooter.getBlocks().get(i);
+ int columnNo = blockMetadata.getColumns().size();
+ for(int j = 0; j < columnNo; j++)
+ {
+ b.getColumnDataInRowGroup(i, j);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8b26974c/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore
----------------------------------------------------------------------
diff --git a/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore b/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore
new file mode 100644
index 0000000..314002f
--- /dev/null
+++ b/contrib/hawq-hadoop/hawq-mapreduce-tool/.gitignore
@@ -0,0 +1,2 @@
+target/
+test-data/*/output