You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:41 UTC
[07/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
new file mode 100644
index 0000000..81a1ffd
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java
@@ -0,0 +1,623 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.index.bst;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.index.IndexMethod;
+import org.apache.tajo.storage.index.IndexWriter;
+import org.apache.tajo.storage.index.OrderIndexReader;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Set;
+import java.util.TreeMap;
+
+import static org.apache.tajo.index.IndexProtos.TupleComparatorProto;
+
+/**
+ * This is two-level binary search tree index. This is one of the value-list
+ * index structure. Thus, it is inefficient in the case where
+ * the many of the values are same. Also, the BST shows the fast performance
+ * when the selectivity of rows to be retrieved is less than 5%.
+ * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe.
+ */
+public class BSTIndex implements IndexMethod {
+ private static final Log LOG = LogFactory.getLog(BSTIndex.class);
+
+ public static final int ONE_LEVEL_INDEX = 1;
+ public static final int TWO_LEVEL_INDEX = 2;
+
+ private final Configuration conf;
+
+ public BSTIndex(final Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema,
+ TupleComparator comparator) throws IOException {
+ return new BSTIndexWriter(fileName, level, keySchema, comparator);
+ }
+
+ @Override
+ public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
+ return new BSTIndexReader(fileName, keySchema, comparator);
+ }
+
+ public BSTIndexReader getIndexReader(Path fileName) throws IOException {
+ return new BSTIndexReader(fileName);
+ }
+
+ public class BSTIndexWriter extends IndexWriter implements Closeable {
+ private FSDataOutputStream out;
+ private FileSystem fs;
+ private int level;
+ private int loadNum = 4096;
+ private Path fileName;
+
+ private final Schema keySchema;
+ private final TupleComparator compartor;
+ private final KeyOffsetCollector collector;
+ private KeyOffsetCollector rootCollector;
+
+ private Tuple firstKey;
+ private Tuple lastKey;
+
+ private RowStoreEncoder rowStoreEncoder;
+
+ // private Tuple lastestKey = null;
+
+ /**
+ * constructor
+ *
+ * @param level
+ * : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX
+ * @throws java.io.IOException
+ */
+ public BSTIndexWriter(final Path fileName, int level, Schema keySchema,
+ TupleComparator comparator) throws IOException {
+ this.fileName = fileName;
+ this.level = level;
+ this.keySchema = keySchema;
+ this.compartor = comparator;
+ this.collector = new KeyOffsetCollector(comparator);
+ this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema);
+ }
+
+ public void setLoadNum(int loadNum) {
+ this.loadNum = loadNum;
+ }
+
+ public void open() throws IOException {
+ fs = fileName.getFileSystem(conf);
+ if (fs.exists(fileName)) {
+ throw new IOException("ERROR: index file (" + fileName + " already exists");
+ }
+ out = fs.create(fileName);
+ }
+
+ @Override
+ public void write(Tuple key, long offset) throws IOException {
+ if (firstKey == null || compartor.compare(key, firstKey) < 0) {
+ firstKey = key;
+ }
+ if (lastKey == null || compartor.compare(lastKey, key) < 0) {
+ lastKey = key;
+ }
+
+ collector.put(key, offset);
+ }
+
+ public TupleComparator getComparator() {
+ return this.compartor;
+ }
+
+ public void flush() throws IOException {
+ out.flush();
+ }
+
+ public void writeHeader(int entryNum) throws IOException {
+ // schema
+ byte [] schemaBytes = keySchema.getProto().toByteArray();
+ out.writeInt(schemaBytes.length);
+ out.write(schemaBytes);
+
+ // comparator
+ byte [] comparatorBytes = compartor.getProto().toByteArray();
+ out.writeInt(comparatorBytes.length);
+ out.write(comparatorBytes);
+
+ // level
+ out.writeInt(this.level);
+ // entry
+ out.writeInt(entryNum);
+ if (entryNum > 0) {
+ byte [] minBytes = rowStoreEncoder.toBytes(firstKey);
+ out.writeInt(minBytes.length);
+ out.write(minBytes);
+ byte [] maxBytes = rowStoreEncoder.toBytes(lastKey);
+ out.writeInt(maxBytes.length);
+ out.write(maxBytes);
+ }
+ out.flush();
+ }
+
+ public void close() throws IOException {
+ /* two level initialize */
+ if (this.level == TWO_LEVEL_INDEX) {
+ rootCollector = new KeyOffsetCollector(this.compartor);
+ }
+
+ /* data writing phase */
+ TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap();
+ Set<Tuple> keySet = keyOffsetMap.keySet();
+
+ int entryNum = keySet.size();
+ writeHeader(entryNum);
+
+ int loadCount = this.loadNum - 1;
+ for (Tuple key : keySet) {
+
+ if (this.level == TWO_LEVEL_INDEX) {
+ loadCount++;
+ if (loadCount == this.loadNum) {
+ rootCollector.put(key, out.getPos());
+ loadCount = 0;
+ }
+ }
+ /* key writing */
+ byte[] buf = rowStoreEncoder.toBytes(key);
+ out.writeInt(buf.length);
+ out.write(buf);
+
+ /**/
+ LinkedList<Long> offsetList = keyOffsetMap.get(key);
+ /* offset num writing */
+ int offsetSize = offsetList.size();
+ out.writeInt(offsetSize);
+ /* offset writing */
+ for (Long offset : offsetList) {
+ out.writeLong(offset);
+ }
+ }
+
+ out.flush();
+ out.close();
+ keySet.clear();
+ collector.clear();
+
+ FSDataOutputStream rootOut = null;
+ /* root index creating phase */
+ if (this.level == TWO_LEVEL_INDEX) {
+ TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap();
+ keySet = rootMap.keySet();
+
+ rootOut = fs.create(new Path(fileName + ".root"));
+ rootOut.writeInt(this.loadNum);
+ rootOut.writeInt(keySet.size());
+
+ /* root key writing */
+ for (Tuple key : keySet) {
+ byte[] buf = rowStoreEncoder.toBytes(key);
+ rootOut.writeInt(buf.length);
+ rootOut.write(buf);
+
+ LinkedList<Long> offsetList = rootMap.get(key);
+ if (offsetList.size() > 1 || offsetList.size() == 0) {
+ throw new IOException("Why root index doen't have one offset?");
+ }
+ rootOut.writeLong(offsetList.getFirst());
+
+ }
+ rootOut.flush();
+ rootOut.close();
+
+ keySet.clear();
+ rootCollector.clear();
+ }
+ }
+
+ private class KeyOffsetCollector {
+ private TreeMap<Tuple, LinkedList<Long>> map;
+
+ public KeyOffsetCollector(TupleComparator comparator) {
+ map = new TreeMap<Tuple, LinkedList<Long>>(comparator);
+ }
+
+ public void put(Tuple key, long offset) {
+ if (map.containsKey(key)) {
+ map.get(key).add(offset);
+ } else {
+ LinkedList<Long> list = new LinkedList<Long>();
+ list.add(offset);
+ map.put(key, list);
+ }
+ }
+
+ public TreeMap<Tuple, LinkedList<Long>> getMap() {
+ return this.map;
+ }
+
+ public void clear() {
+ this.map.clear();
+ }
+ }
+ }
+
+ /**
+ * BSTIndexReader is thread-safe.
+ */
+ public class BSTIndexReader implements OrderIndexReader , Closeable{
+ private Path fileName;
+ private Schema keySchema;
+ private TupleComparator comparator;
+
+ private FileSystem fs;
+ private FSDataInputStream indexIn;
+ private FSDataInputStream subIn;
+
+ private int level;
+ private int entryNum;
+ private int loadNum = -1;
+ private Tuple firstKey;
+ private Tuple lastKey;
+
+ // the cursors of BST
+ private int rootCursor;
+ private int keyCursor;
+ private int offsetCursor;
+
+ // mutex
+ private final Object mutex = new Object();
+
+ private RowStoreDecoder rowStoreDecoder;
+
+ /**
+ *
+ * @param fileName
+ * @param keySchema
+ * @param comparator
+ * @throws java.io.IOException
+ */
+ public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException {
+ this.fileName = fileName;
+ this.keySchema = keySchema;
+ this.comparator = comparator;
+ this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
+ }
+
+ public BSTIndexReader(final Path fileName) throws IOException {
+ this.fileName = fileName;
+ }
+
+ public Schema getKeySchema() {
+ return this.keySchema;
+ }
+
+ public TupleComparator getComparator() {
+ return this.comparator;
+ }
+
+ private void readHeader() throws IOException {
+ // schema
+ int schemaByteSize = indexIn.readInt();
+ byte [] schemaBytes = new byte[schemaByteSize];
+ StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize);
+
+ SchemaProto.Builder builder = SchemaProto.newBuilder();
+ builder.mergeFrom(schemaBytes);
+ SchemaProto proto = builder.build();
+ this.keySchema = new Schema(proto);
+ this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema);
+
+ // comparator
+ int compByteSize = indexIn.readInt();
+ byte [] compBytes = new byte[compByteSize];
+ StorageUtil.readFully(indexIn, compBytes, 0, compByteSize);
+
+ TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder();
+ compProto.mergeFrom(compBytes);
+ this.comparator = new BaseTupleComparator(compProto.build());
+
+ // level
+ this.level = indexIn.readInt();
+ // entry
+ this.entryNum = indexIn.readInt();
+ if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values
+ byte [] minBytes = new byte[indexIn.readInt()];
+ StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length);
+ this.firstKey = rowStoreDecoder.toTuple(minBytes);
+
+ byte [] maxBytes = new byte[indexIn.readInt()];
+ StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length);
+ this.lastKey = rowStoreDecoder.toTuple(maxBytes);
+ }
+ }
+
+ public void open()
+ throws IOException {
+ /* init the index file */
+ fs = fileName.getFileSystem(conf);
+ if (!fs.exists(fileName)) {
+ throw new FileNotFoundException("ERROR: does not exist " + fileName.toString());
+ }
+
+ indexIn = fs.open(this.fileName);
+ readHeader();
+ fillData();
+ }
+
+ private void fillData() throws IOException {
+ /* load on memory */
+ if (this.level == TWO_LEVEL_INDEX) {
+
+ Path rootPath = new Path(this.fileName + ".root");
+ if (!fs.exists(rootPath)) {
+ throw new FileNotFoundException("root index did not created");
+ }
+
+ subIn = indexIn;
+ indexIn = fs.open(rootPath);
+ /* root index header reading : type => loadNum => indexSize */
+ this.loadNum = indexIn.readInt();
+ this.entryNum = indexIn.readInt();
+ /**/
+ fillRootIndex(entryNum, indexIn);
+
+ } else {
+ fillLeafIndex(entryNum, indexIn, -1);
+ }
+ }
+
+ /**
+ *
+ * @return
+ * @throws java.io.IOException
+ */
+ public long find(Tuple key) throws IOException {
+ return find(key, false);
+ }
+
+ @Override
+ public long find(Tuple key, boolean nextKey) throws IOException {
+ synchronized (mutex) {
+ int pos = -1;
+ if (this.level == ONE_LEVEL_INDEX) {
+ pos = oneLevBS(key);
+ } else if (this.level == TWO_LEVEL_INDEX) {
+ pos = twoLevBS(key, this.loadNum + 1);
+ } else {
+ throw new IOException("More than TWL_LEVEL_INDEX is not supported.");
+ }
+
+ if (nextKey) {
+ if (pos + 1 >= this.offsetSubIndex.length) {
+ return -1;
+ }
+ keyCursor = pos + 1;
+ offsetCursor = 0;
+ } else {
+ if (correctable) {
+ keyCursor = pos;
+ offsetCursor = 0;
+ } else {
+ return -1;
+ }
+ }
+
+ return this.offsetSubIndex[keyCursor][offsetCursor];
+ }
+ }
+
+ public long next() throws IOException {
+ synchronized (mutex) {
+ if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) {
+ offsetCursor++;
+ } else {
+ if (offsetSubIndex.length - 1 > keyCursor) {
+ keyCursor++;
+ offsetCursor = 0;
+ } else {
+ if (offsetIndex.length -1 > rootCursor) {
+ rootCursor++;
+ fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]);
+ keyCursor = 1;
+ offsetCursor = 0;
+ } else {
+ return -1;
+ }
+ }
+ }
+
+ return this.offsetSubIndex[keyCursor][offsetCursor];
+ }
+ }
+
+ public boolean isCurInMemory() {
+ return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor);
+ }
+
+ private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos)
+ throws IOException {
+ int counter = 0;
+ try {
+ if (pos != -1) {
+ in.seek(pos);
+ }
+ this.dataSubIndex = new Tuple[entryNum];
+ this.offsetSubIndex = new long[entryNum][];
+
+ byte[] buf;
+ for (int i = 0; i < entryNum; i++) {
+ counter++;
+ buf = new byte[in.readInt()];
+ StorageUtil.readFully(in, buf, 0, buf.length);
+ dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
+
+ int offsetNum = in.readInt();
+ this.offsetSubIndex[i] = new long[offsetNum];
+ for (int j = 0; j < offsetNum; j++) {
+ this.offsetSubIndex[i][j] = in.readLong();
+ }
+
+ }
+
+ } catch (IOException e) {
+ counter--;
+ if (pos != -1) {
+ in.seek(pos);
+ }
+ this.dataSubIndex = new Tuple[counter];
+ this.offsetSubIndex = new long[counter][];
+
+ byte[] buf;
+ for (int i = 0; i < counter; i++) {
+ buf = new byte[in.readInt()];
+ StorageUtil.readFully(in, buf, 0, buf.length);
+ dataSubIndex[i] = rowStoreDecoder.toTuple(buf);
+
+ int offsetNum = in.readInt();
+ this.offsetSubIndex[i] = new long[offsetNum];
+ for (int j = 0; j < offsetNum; j++) {
+ this.offsetSubIndex[i][j] = in.readLong();
+ }
+
+ }
+ }
+ }
+
+ public Tuple getFirstKey() {
+ return this.firstKey;
+ }
+
+ public Tuple getLastKey() {
+ return this.lastKey;
+ }
+
+ private void fillRootIndex(int entryNum, FSDataInputStream in)
+ throws IOException {
+ this.dataIndex = new Tuple[entryNum];
+ this.offsetIndex = new long[entryNum];
+ Tuple keyTuple;
+ byte[] buf;
+ for (int i = 0; i < entryNum; i++) {
+ buf = new byte[in.readInt()];
+ StorageUtil.readFully(in, buf, 0, buf.length);
+ keyTuple = rowStoreDecoder.toTuple(buf);
+ dataIndex[i] = keyTuple;
+ this.offsetIndex[i] = in.readLong();
+ }
+ }
+
+ /* memory index, only one is used. */
+ private Tuple[] dataIndex = null;
+ private Tuple[] dataSubIndex = null;
+
+ /* offset index */
+ private long[] offsetIndex = null;
+ private long[][] offsetSubIndex = null;
+
+ private boolean correctable = true;
+
+ private int oneLevBS(Tuple key) throws IOException {
+ correctable = true;
+ int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
+ return pos;
+ }
+
+ private int twoLevBS(Tuple key, int loadNum) throws IOException {
+ int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length);
+ if(pos > 0) {
+ rootCursor = pos;
+ } else {
+ rootCursor = 0;
+ }
+ fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]);
+ pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length);
+
+ return pos;
+ }
+
+ private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) {
+ int offset = -1;
+ int start = startPos;
+ int end = endPos;
+
+ //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541
+ int centerPos = (start + end) >>> 1;
+ while (true) {
+ if (comparator.compare(arr[centerPos], key) > 0) {
+ if (centerPos == 0) {
+ correctable = false;
+ break;
+ } else if (comparator.compare(arr[centerPos - 1], key) < 0) {
+ correctable = false;
+ offset = centerPos - 1;
+ break;
+ } else {
+ end = centerPos;
+ centerPos = (start + end) / 2;
+ }
+ } else if (comparator.compare(arr[centerPos], key) < 0) {
+ if (centerPos == arr.length - 1) {
+ correctable = false;
+ offset = centerPos;
+ break;
+ } else if (comparator.compare(arr[centerPos + 1], key) > 0) {
+ correctable = false;
+ offset = centerPos;
+ break;
+ } else {
+ start = centerPos + 1;
+ centerPos = (start + end) / 2;
+ }
+ } else {
+ correctable = true;
+ offset = centerPos;
+ break;
+ }
+ }
+ return offset;
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.indexIn.close();
+ this.subIn.close();
+ }
+
+ @Override
+ public String toString() {
+ return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
new file mode 100644
index 0000000..dfe36f6
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+
+import io.netty.buffer.ByteBuf;
+import net.minidev.json.JSONArray;
+import net.minidev.json.JSONObject;
+import net.minidev.json.parser.JSONParser;
+import net.minidev.json.parser.ParseException;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineParsingError;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class JsonLineDeserializer extends TextLineDeserializer {
+ private JSONParser parser;
+ private Type [] types;
+ private String [] columnNames;
+
+ public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+ super(schema, meta, targetColumnIndexes);
+ }
+
+ @Override
+ public void init() {
+ types = SchemaUtil.toTypes(schema);
+ columnNames = SchemaUtil.toSimpleNames(schema);
+
+ parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE);
+ }
+
+ @Override
+ public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError {
+ byte [] line = new byte[buf.readableBytes()];
+ buf.readBytes(line);
+
+ try {
+ JSONObject object = (JSONObject) parser.parse(line);
+
+ for (int i = 0; i < targetColumnIndexes.length; i++) {
+ int actualIdx = targetColumnIndexes[i];
+ String fieldName = columnNames[actualIdx];
+
+ if (!object.containsKey(fieldName)) {
+ output.put(actualIdx, NullDatum.get());
+ continue;
+ }
+
+ switch (types[actualIdx]) {
+ case BOOLEAN:
+ String boolStr = object.getAsString(fieldName);
+ if (boolStr != null) {
+ output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true")));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case CHAR:
+ String charStr = object.getAsString(fieldName);
+ if (charStr != null) {
+ output.put(actualIdx, DatumFactory.createChar(charStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case INT1:
+ case INT2:
+ Number int2Num = object.getAsNumber(fieldName);
+ if (int2Num != null) {
+ output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case INT4:
+ Number int4Num = object.getAsNumber(fieldName);
+ if (int4Num != null) {
+ output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case INT8:
+ Number int8Num = object.getAsNumber(fieldName);
+ if (int8Num != null) {
+ output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case FLOAT4:
+ Number float4Num = object.getAsNumber(fieldName);
+ if (float4Num != null) {
+ output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case FLOAT8:
+ Number float8Num = object.getAsNumber(fieldName);
+ if (float8Num != null) {
+ output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue()));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case TEXT:
+ String textStr = object.getAsString(fieldName);
+ if (textStr != null) {
+ output.put(actualIdx, DatumFactory.createText(textStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case TIMESTAMP:
+ String timestampStr = object.getAsString(fieldName);
+ if (timestampStr != null) {
+ output.put(actualIdx, DatumFactory.createTimestamp(timestampStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case TIME:
+ String timeStr = object.getAsString(fieldName);
+ if (timeStr != null) {
+ output.put(actualIdx, DatumFactory.createTime(timeStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case DATE:
+ String dateStr = object.getAsString(fieldName);
+ if (dateStr != null) {
+ output.put(actualIdx, DatumFactory.createDate(dateStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ case BIT:
+ case BINARY:
+ case VARBINARY:
+ case BLOB: {
+ Object jsonObject = object.get(fieldName);
+
+ if (jsonObject == null) {
+ output.put(actualIdx, NullDatum.get());
+ break;
+ }
+ if (jsonObject instanceof String) {
+ output.put(actualIdx, DatumFactory.createBlob((String) jsonObject));
+ } else if (jsonObject instanceof JSONArray) {
+ JSONArray jsonArray = (JSONArray) jsonObject;
+ byte[] bytes = new byte[jsonArray.size()];
+ Iterator<Object> it = jsonArray.iterator();
+ int arrayIdx = 0;
+ while (it.hasNext()) {
+ bytes[arrayIdx++] = ((Long) it.next()).byteValue();
+ }
+ if (bytes.length > 0) {
+ output.put(actualIdx, DatumFactory.createBlob(bytes));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+ } else {
+ throw new IOException("Unknown json object: " + object.getClass().getSimpleName());
+ }
+ break;
+ }
+ case INET4:
+ String inetStr = object.getAsString(fieldName);
+ if (inetStr != null) {
+ output.put(actualIdx, DatumFactory.createInet4(inetStr));
+ } else {
+ output.put(actualIdx, NullDatum.get());
+ }
+ break;
+
+ case NULL_TYPE:
+ output.put(actualIdx, NullDatum.get());
+ break;
+
+ default:
+ throw new NotImplementedException(types[actualIdx].name() + " is not supported.");
+ }
+ }
+ } catch (ParseException pe) {
+ throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), pe);
+ } catch (Throwable e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void release() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
new file mode 100644
index 0000000..6db2c29
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.text.TextLineDeserializer;
+import org.apache.tajo.storage.text.TextLineSerDe;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+public class JsonLineSerDe extends TextLineSerDe {
+ @Override
+ public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) {
+ return new JsonLineDeserializer(schema, meta, targetColumnIndexes);
+ }
+
+ @Override
+ public TextLineSerializer createSerializer(Schema schema, TableMeta meta) {
+ return new JsonLineSerializer(schema, meta);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
new file mode 100644
index 0000000..cd31ada
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.json;
+
+
+import net.minidev.json.JSONObject;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.SchemaUtil;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.datum.TextDatum;
+import org.apache.tajo.datum.protobuf.ProtobufJsonFormat;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.text.TextLineSerializer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class JsonLineSerializer extends TextLineSerializer {
+ private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance();
+
+ private Type [] types;
+ private String [] simpleNames;
+ private int columnNum;
+
+
+ public JsonLineSerializer(Schema schema, TableMeta meta) {
+ super(schema, meta);
+ }
+
+ @Override
+ public void init() {
+ types = SchemaUtil.toTypes(schema);
+ simpleNames = SchemaUtil.toSimpleNames(schema);
+ columnNum = schema.size();
+ }
+
+ @Override
+ public int serialize(OutputStream out, Tuple input) throws IOException {
+ JSONObject jsonObject = new JSONObject();
+
+ for (int i = 0; i < columnNum; i++) {
+ if (input.isNull(i)) {
+ continue;
+ }
+
+ String fieldName = simpleNames[i];
+ Type type = types[i];
+
+ switch (type) {
+
+ case BOOLEAN:
+ jsonObject.put(fieldName, input.getBool(i));
+ break;
+
+ case INT1:
+ case INT2:
+ jsonObject.put(fieldName, input.getInt2(i));
+ break;
+
+ case INT4:
+ jsonObject.put(fieldName, input.getInt4(i));
+ break;
+
+ case INT8:
+ jsonObject.put(fieldName, input.getInt8(i));
+ break;
+
+ case FLOAT4:
+ jsonObject.put(fieldName, input.getFloat4(i));
+ break;
+
+ case FLOAT8:
+ jsonObject.put(fieldName, input.getFloat8(i));
+ break;
+
+ case CHAR:
+ case TEXT:
+ case VARCHAR:
+ case INET4:
+ case TIMESTAMP:
+ case DATE:
+ case TIME:
+ case INTERVAL:
+ jsonObject.put(fieldName, input.getText(i));
+ break;
+
+ case BIT:
+ case BINARY:
+ case BLOB:
+ case VARBINARY:
+ jsonObject.put(fieldName, input.getBytes(i));
+ break;
+
+ case NULL_TYPE:
+ break;
+
+ default:
+ throw new NotImplementedException(types[i].name() + " is not supported.");
+ }
+ }
+
+ String jsonStr = jsonObject.toJSONString();
+ byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET);
+ out.write(jsonBytes);
+ return jsonBytes.length;
+ }
+
+ @Override
+ public void release() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
new file mode 100644
index 0000000..b10d423
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.storage.StorageConstants;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.TableStatistics;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+
+/**
+ * FileAppender for writing to Parquet files.
+ */
+public class ParquetAppender extends FileAppender {
+ private TajoParquetWriter writer;
+ private int blockSize;
+ private int pageSize;
+ private CompressionCodecName compressionCodecName;
+ private boolean enableDictionary;
+ private boolean validating;
+ private TableStatistics stats;
+
+ /**
+ * Creates a new ParquetAppender.
+ *
+ * @param conf Configuration properties.
+ * @param schema The table schema.
+ * @param meta The table metadata.
+ * @param workDir The path of the Parquet file to write to.
+ */
+ public ParquetAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, TableMeta meta,
+ Path workDir) throws IOException {
+ super(conf, taskAttemptId, schema, meta, workDir);
+ this.blockSize = Integer.parseInt(
+ meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE));
+ this.pageSize = Integer.parseInt(
+ meta.getOption(ParquetOutputFormat.PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE));
+ this.compressionCodecName = CompressionCodecName.fromConf(
+ meta.getOption(ParquetOutputFormat.COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME));
+ this.enableDictionary = Boolean.parseBoolean(
+ meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED));
+ this.validating = Boolean.parseBoolean(
+ meta.getOption(ParquetOutputFormat.VALIDATION, StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED));
+ }
+
+ /**
+ * Initializes the Appender. This method creates a new TajoParquetWriter
+ * and initializes the table statistics if enabled.
+ */
+ public void init() throws IOException {
+ writer = new TajoParquetWriter(path,
+ schema,
+ compressionCodecName,
+ blockSize,
+ pageSize,
+ enableDictionary,
+ validating);
+ if (enabledStats) {
+ this.stats = new TableStatistics(schema);
+ }
+ super.init();
+ }
+
+ /**
+ * Gets the current offset. Tracking offsets is currenly not implemented, so
+ * this method always returns 0.
+ *
+ * @return 0
+ */
+ @Override
+ public long getOffset() throws IOException {
+ return 0;
+ }
+
+ /**
+ * Write a Tuple to the Parquet file.
+ *
+ * @param tuple The Tuple to write.
+ */
+ @Override
+ public void addTuple(Tuple tuple) throws IOException {
+ if (enabledStats) {
+ for (int i = 0; i < schema.size(); ++i) {
+ stats.analyzeField(i, tuple.get(i));
+ }
+ }
+ writer.write(tuple);
+ if (enabledStats) {
+ stats.incrementRow();
+ }
+ }
+
+ /**
+ * The ParquetWriter does not need to be flushed, so this is a no-op.
+ */
+ @Override
+ public void flush() throws IOException {
+ }
+
+ /**
+ * Closes the Appender.
+ */
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ public long getEstimatedOutputSize() throws IOException {
+ return writer.getEstimatedWrittenSize();
+ }
+
+ /**
+ * If table statistics is enabled, retrieve the table statistics.
+ *
+ * @return Table statistics if enabled or null otherwise.
+ */
+ @Override
+ public TableStats getStats() {
+ if (enabledStats) {
+ return stats.getTableStat();
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
new file mode 100644
index 0000000..2f8efcf
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+
+import java.io.IOException;
+
+/**
+ * FileScanner for reading Parquet files
+ */
+public class ParquetScanner extends FileScanner {
+ private TajoParquetReader reader;
+
+ /**
+ * Creates a new ParquetScanner.
+ *
+ * @param conf
+ * @param schema
+ * @param meta
+ * @param fragment
+ */
+ public ParquetScanner(Configuration conf, final Schema schema,
+ final TableMeta meta, final Fragment fragment) {
+ super(conf, schema, meta, fragment);
+ }
+
+ /**
+ * Initializes the ParquetScanner. This method initializes the
+ * TajoParquetReader.
+ */
+ @Override
+ public void init() throws IOException {
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+ reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets));
+ super.init();
+ }
+
+ /**
+ * Reads the next Tuple from the Parquet file.
+ *
+ * @return The next Tuple from the Parquet file or null if end of file is
+ * reached.
+ */
+ @Override
+ public Tuple next() throws IOException {
+ return reader.read();
+ }
+
+ /**
+ * Resets the scanner
+ */
+ @Override
+ public void reset() throws IOException {
+ }
+
+ /**
+ * Closes the scanner.
+ */
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ /**
+ * Returns whether this scanner is projectable.
+ *
+ * @return true
+ */
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ /**
+ * Returns whether this scanner is selectable.
+ *
+ * @return false
+ */
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ /**
+ * Returns whether this scanner is splittable.
+ *
+ * @return false
+ */
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
new file mode 100644
index 0000000..a765f48
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.thirdparty.parquet.ParquetReader;
+import parquet.filter.UnboundRecordFilter;
+
+import java.io.IOException;
+
+/**
+ * Tajo implementation of {@link ParquetReader} to read Tajo records from a
+ * Parquet file. Users should use {@link ParquetScanner} and not this class
+ * directly.
+ */
+public class TajoParquetReader extends ParquetReader<Tuple> {
+ /**
+ * Creates a new TajoParquetReader.
+ *
+ * @param file The file to read from.
+ * @param readSchema Tajo schema of the table.
+ */
+ public TajoParquetReader(Path file, Schema readSchema) throws IOException {
+ super(file, new TajoReadSupport(readSchema));
+ }
+
+ /**
+ * Creates a new TajoParquetReader.
+ *
+ * @param file The file to read from.
+ * @param readSchema Tajo schema of the table.
+ * @param requestedSchema Tajo schema of the projection.
+ */
+ public TajoParquetReader(Path file, Schema readSchema,
+ Schema requestedSchema) throws IOException {
+ super(file, new TajoReadSupport(readSchema, requestedSchema));
+ }
+
+ /**
+ * Creates a new TajoParquetReader.
+ *
+ * @param file The file to read from.
+ * @param readSchema Tajo schema of the table.
+ * @param recordFilter Record filter.
+ */
+ public TajoParquetReader(Path file, Schema readSchema,
+ UnboundRecordFilter recordFilter)
+ throws IOException {
+ super(file, new TajoReadSupport(readSchema), recordFilter);
+ }
+
+ /**
+ * Creates a new TajoParquetReader.
+ *
+ * @param file The file to read from.
+ * @param readSchema Tajo schema of the table.
+ * @param requestedSchema Tajo schema of the projection.
+ * @param recordFilter Record filter.
+ */
+ public TajoParquetReader(Path file, Schema readSchema,
+ Schema requestedSchema,
+ UnboundRecordFilter recordFilter)
+ throws IOException {
+ super(file, new TajoReadSupport(readSchema, requestedSchema),
+ recordFilter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
new file mode 100644
index 0000000..5f220c5
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter;
+import parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+
+/**
+ * Tajo implementation of {@link ParquetWriter} to write Tajo records to a
+ * Parquet file. Users should use {@link ParquetAppender} and not this class
+ * directly.
+ */
+public class TajoParquetWriter extends ParquetWriter<Tuple> {
+ /**
+ * Create a new TajoParquetWriter
+ *
+ * @param file The file name to write to.
+ * @param schema The Tajo schema of the table.
+ * @param compressionCodecName Compression codec to use, or
+ * CompressionCodecName.UNCOMPRESSED.
+ * @param blockSize The block size threshold.
+ * @param pageSize See parquet write up. Blocks are subdivided into pages
+ * for alignment.
+ * @throws java.io.IOException
+ */
+ public TajoParquetWriter(Path file,
+ Schema schema,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize) throws IOException {
+ super(file,
+ new TajoWriteSupport(schema),
+ compressionCodecName,
+ blockSize,
+ pageSize);
+ }
+
+ /**
+ * Create a new TajoParquetWriter.
+ *
+ * @param file The file name to write to.
+ * @param schema The Tajo schema of the table.
+ * @param compressionCodecName Compression codec to use, or
+ * CompressionCodecName.UNCOMPRESSED.
+ * @param blockSize The block size threshold.
+ * @param pageSize See parquet write up. Blocks are subdivided into pages
+ * for alignment.
+ * @param enableDictionary Whether to use a dictionary to compress columns.
+ * @param validating Whether to turn on validation.
+ * @throws java.io.IOException
+ */
+ public TajoParquetWriter(Path file,
+ Schema schema,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ boolean enableDictionary,
+ boolean validating) throws IOException {
+ super(file,
+ new TajoWriteSupport(schema),
+ compressionCodecName,
+ blockSize,
+ pageSize,
+ enableDictionary,
+ validating);
+ }
+
+ /**
+ * Creates a new TajoParquetWriter. The default block size is 128 MB.
+ * The default page size is 1 MB. Default compression is no compression.
+ *
+ * @param file The Path of the file to write to.
+ * @param schema The Tajo schema of the table.
+ * @throws java.io.IOException
+ */
+ public TajoParquetWriter(Path file, Schema schema) throws IOException {
+ this(file,
+ schema,
+ CompressionCodecName.UNCOMPRESSED,
+ DEFAULT_BLOCK_SIZE,
+ DEFAULT_PAGE_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
new file mode 100644
index 0000000..a64e987
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+import parquet.Log;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import java.util.Map;
+
+/**
+ * Tajo implementation of {@link parquet.hadoop.api.ReadSupport} for {@link org.apache.tajo.storage.Tuple}s.
+ * Users should use {@link org.apache.tajo.storage.parquet.ParquetScanner} and not this class directly.
+ */
+public class TajoReadSupport extends ReadSupport<Tuple> {
+ private static final Log LOG = Log.getLog(TajoReadSupport.class);
+
+ private Schema readSchema;
+ private Schema requestedSchema;
+
+ /**
+ * Creates a new TajoReadSupport.
+ *
+ * @param requestedSchema The Tajo schema of the requested projection passed
+ * down by ParquetScanner.
+ */
+ public TajoReadSupport(Schema readSchema, Schema requestedSchema) {
+ super();
+ this.readSchema = readSchema;
+ this.requestedSchema = requestedSchema;
+ }
+
+ /**
+ * Creates a new TajoReadSupport.
+ *
+ * @param readSchema The schema of the table.
+ */
+ public TajoReadSupport(Schema readSchema) {
+ super();
+ this.readSchema = readSchema;
+ this.requestedSchema = readSchema;
+ }
+
+ /**
+ * Initializes the ReadSupport.
+ *
+ * @param context The InitContext.
+ * @return A ReadContext that defines how to read the file.
+ */
+ @Override
+ public ReadContext init(InitContext context) {
+ if (requestedSchema == null) {
+ throw new RuntimeException("requestedSchema is null.");
+ }
+ MessageType requestedParquetSchema =
+ new TajoSchemaConverter().convert(requestedSchema);
+ LOG.debug("Reading data with projection:\n" + requestedParquetSchema);
+ return new ReadContext(requestedParquetSchema);
+ }
+
+ /**
+ * Prepares for read.
+ *
+ * @param configuration The job configuration.
+ * @param keyValueMetaData App-specific metadata from the file.
+ * @param fileSchema The schema of the Parquet file.
+ * @param readContext Returned by the init method.
+ */
+ @Override
+ public RecordMaterializer<Tuple> prepareForRead(
+ Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadContext readContext) {
+ MessageType parquetRequestedSchema = readContext.getRequestedSchema();
+ return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
new file mode 100644
index 0000000..a091eac
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -0,0 +1,380 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.*;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Converter to convert a Parquet record into a Tajo Tuple.
+ */
+public class TajoRecordConverter extends GroupConverter {
+ private final GroupType parquetSchema;
+ private final Schema tajoReadSchema;
+ private final int[] projectionMap;
+ private final int tupleSize;
+
+ private final Converter[] converters;
+
+ private Tuple currentTuple;
+
+ /**
+ * Creates a new TajoRecordConverter.
+ *
+ * @param parquetSchema The Parquet schema of the projection.
+ * @param tajoReadSchema The Tajo schema of the table.
+ * @param projectionMap An array mapping the projection column to the column
+ * index in the table.
+ */
+ public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema,
+ int[] projectionMap) {
+ this.parquetSchema = parquetSchema;
+ this.tajoReadSchema = tajoReadSchema;
+ this.projectionMap = projectionMap;
+ this.tupleSize = tajoReadSchema.size();
+
+ // The projectionMap.length does not match parquetSchema.getFieldCount()
+ // when the projection contains NULL_TYPE columns. We will skip over the
+ // NULL_TYPE columns when we construct the converters and populate the
+ // NULL_TYPE columns with NullDatums in start().
+ int index = 0;
+ this.converters = new Converter[parquetSchema.getFieldCount()];
+ for (int i = 0; i < projectionMap.length; ++i) {
+ final int projectionIndex = projectionMap[i];
+ Column column = tajoReadSchema.getColumn(projectionIndex);
+ if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
+ continue;
+ }
+ Type type = parquetSchema.getType(index);
+ converters[index] = newConverter(column, type, new ParentValueContainer() {
+ @Override
+ void add(Object value) {
+ TajoRecordConverter.this.set(projectionIndex, value);
+ }
+ });
+ ++index;
+ }
+ }
+
+ private void set(int index, Object value) {
+ currentTuple.put(index, (Datum)value);
+ }
+
+ private Converter newConverter(Column column, Type type,
+ ParentValueContainer parent) {
+ DataType dataType = column.getDataType();
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ return new FieldBooleanConverter(parent);
+ case BIT:
+ return new FieldBitConverter(parent);
+ case CHAR:
+ return new FieldCharConverter(parent);
+ case INT2:
+ return new FieldInt2Converter(parent);
+ case INT4:
+ return new FieldInt4Converter(parent);
+ case INT8:
+ return new FieldInt8Converter(parent);
+ case FLOAT4:
+ return new FieldFloat4Converter(parent);
+ case FLOAT8:
+ return new FieldFloat8Converter(parent);
+ case INET4:
+ return new FieldInet4Converter(parent);
+ case INET6:
+ throw new RuntimeException("No converter for INET6");
+ case TEXT:
+ return new FieldTextConverter(parent);
+ case PROTOBUF:
+ return new FieldProtobufConverter(parent, dataType);
+ case BLOB:
+ return new FieldBlobConverter(parent);
+ case NULL_TYPE:
+ throw new RuntimeException("No converter for NULL_TYPE.");
+ default:
+ throw new RuntimeException("Unsupported data type");
+ }
+ }
+
+ /**
+ * Gets the converter for a specific field.
+ *
+ * @param fieldIndex Index of the field in the projection.
+ * @return The converter for the field.
+ */
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return converters[fieldIndex];
+ }
+
+ /**
+ * Called before processing fields. This method fills any fields that have
+ * NULL values or have type NULL_TYPE with a NullDatum.
+ */
+ @Override
+ public void start() {
+ currentTuple = new VTuple(tupleSize);
+ }
+
+ /**
+ * Called after all fields have been processed.
+ */
+ @Override
+ public void end() {
+ for (int i = 0; i < projectionMap.length; ++i) {
+ final int projectionIndex = projectionMap[i];
+ Column column = tajoReadSchema.getColumn(projectionIndex);
+ if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
+ || currentTuple.get(projectionIndex) == null) {
+ set(projectionIndex, NullDatum.get());
+ }
+ }
+ }
+
+ /**
+ * Returns the current record converted by this converter.
+ *
+ * @return The current record.
+ */
+ public Tuple getCurrentRecord() {
+ return currentTuple;
+ }
+
+ static abstract class ParentValueContainer {
+ /**
+ * Adds the value to the parent.
+ *
+ * @param value The value to add.
+ */
+ abstract void add(Object value);
+ }
+
+ static final class FieldBooleanConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldBooleanConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBoolean(boolean value) {
+ parent.add(DatumFactory.createBool(value));
+ }
+ }
+
+ static final class FieldBitConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldBitConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(DatumFactory.createBit((byte)(value & 0xff)));
+ }
+ }
+
+ static final class FieldCharConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldCharConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(DatumFactory.createChar(value.getBytes()));
+ }
+ }
+
+ static final class FieldInt2Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInt2Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(DatumFactory.createInt2((short)value));
+ }
+ }
+
+ static final class FieldInt4Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInt4Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(DatumFactory.createInt4(value));
+ }
+ }
+
+ static final class FieldInt8Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInt8Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(DatumFactory.createInt8(value));
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(DatumFactory.createInt8(Long.valueOf(value)));
+ }
+ }
+
+ static final class FieldFloat4Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldFloat4Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
+ }
+
+ @Override
+ final public void addFloat(float value) {
+ parent.add(DatumFactory.createFloat4(value));
+ }
+ }
+
+ static final class FieldFloat8Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldFloat8Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
+ }
+
+ @Override
+ final public void addFloat(float value) {
+ parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
+ }
+
+ @Override
+ final public void addDouble(double value) {
+ parent.add(DatumFactory.createFloat8(value));
+ }
+ }
+
+ static final class FieldInet4Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInet4Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(DatumFactory.createInet4(value.getBytes()));
+ }
+ }
+
+ static final class FieldTextConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldTextConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(DatumFactory.createText(value.getBytes()));
+ }
+ }
+
+ static final class FieldBlobConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldBlobConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes())));
+ }
+ }
+
+ static final class FieldProtobufConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+ private final DataType dataType;
+
+ public FieldProtobufConverter(ParentValueContainer parent,
+ DataType dataType) {
+ this.parent = parent;
+ this.dataType = dataType;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ try {
+ ProtobufDatumFactory factory =
+ ProtobufDatumFactory.get(dataType.getCode());
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(value.getBytes());
+ parent.add(factory.createDatum(builder));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
new file mode 100644
index 0000000..436159c
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.Tuple;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+/**
+ * Materializes a Tajo Tuple from a stream of Parquet data.
+ */
+class TajoRecordMaterializer extends RecordMaterializer<Tuple> {
+ private final TajoRecordConverter root;
+
+ /**
+ * Creates a new TajoRecordMaterializer.
+ *
+ * @param parquetSchema The Parquet schema of the projection.
+ * @param tajoSchema The Tajo schema of the projection.
+ * @param tajoReadSchema The Tajo schema of the table.
+ */
+ public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema,
+ Schema tajoReadSchema) {
+ int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
+ this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema,
+ projectionMap);
+ }
+
+ private int[] getProjectionMap(Schema schema, Schema projection) {
+ Column[] targets = projection.toArray();
+ int[] projectionMap = new int[targets.length];
+ for (int i = 0; i < targets.length; ++i) {
+ int tid = schema.getColumnId(targets[i].getQualifiedName());
+ projectionMap[i] = tid;
+ }
+ return projectionMap;
+ }
+
+ /**
+ * Returns the current record being materialized.
+ *
+ * @return The record being materialized.
+ */
+ @Override
+ public Tuple getCurrentRecord() {
+ return root.getCurrentRecord();
+ }
+
+ /**
+ * Returns the root converter.
+ *
+ * @return The root converter
+ */
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
new file mode 100644
index 0000000..555b623
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Converts between Parquet and Tajo schemas. See package documentation for
+ * details on the mapping.
+ */
+public class TajoSchemaConverter {
+ private static final String TABLE_SCHEMA = "table_schema";
+
+ /**
+ * Creates a new TajoSchemaConverter.
+ */
+ public TajoSchemaConverter() {
+ }
+
+ /**
+ * Converts a Parquet schema to a Tajo schema.
+ *
+ * @param parquetSchema The Parquet schema to convert.
+ * @return The resulting Tajo schema.
+ */
+ public Schema convert(MessageType parquetSchema) {
+ return convertFields(parquetSchema.getFields());
+ }
+
+ private Schema convertFields(List<Type> parquetFields) {
+ List<Column> columns = new ArrayList<Column>();
+ for (int i = 0; i < parquetFields.size(); ++i) {
+ Type fieldType = parquetFields.get(i);
+ if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
+ throw new RuntimeException("REPEATED not supported outside LIST or" +
+ " MAP. Type: " + fieldType);
+ }
+ columns.add(convertField(fieldType));
+ }
+ Column[] columnsArray = new Column[columns.size()];
+ columnsArray = columns.toArray(columnsArray);
+ return new Schema(columnsArray);
+ }
+
+ private Column convertField(final Type fieldType) {
+ if (fieldType.isPrimitive()) {
+ return convertPrimitiveField(fieldType);
+ } else {
+ return convertComplexField(fieldType);
+ }
+ }
+
+ private Column convertPrimitiveField(final Type fieldType) {
+ final String fieldName = fieldType.getName();
+ final PrimitiveTypeName parquetPrimitiveTypeName =
+ fieldType.asPrimitiveType().getPrimitiveTypeName();
+ final OriginalType originalType = fieldType.getOriginalType();
+ return parquetPrimitiveTypeName.convert(
+ new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() {
+ @Override
+ public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
+ return new Column(fieldName, TajoDataTypes.Type.BOOLEAN);
+ }
+
+ @Override
+ public Column convertINT32(PrimitiveTypeName primitiveTypeName) {
+ return new Column(fieldName, TajoDataTypes.Type.INT4);
+ }
+
+ @Override
+ public Column convertINT64(PrimitiveTypeName primitiveTypeName) {
+ return new Column(fieldName, TajoDataTypes.Type.INT8);
+ }
+
+ @Override
+ public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) {
+ return new Column(fieldName, TajoDataTypes.Type.FLOAT4);
+ }
+
+ @Override
+ public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
+ return new Column(fieldName, TajoDataTypes.Type.FLOAT8);
+ }
+
+ @Override
+ public Column convertFIXED_LEN_BYTE_ARRAY(
+ PrimitiveTypeName primitiveTypeName) {
+ return new Column(fieldName, TajoDataTypes.Type.BLOB);
+ }
+
+ @Override
+ public Column convertBINARY(PrimitiveTypeName primitiveTypeName) {
+ if (originalType == OriginalType.UTF8) {
+ return new Column(fieldName, TajoDataTypes.Type.TEXT);
+ } else {
+ return new Column(fieldName, TajoDataTypes.Type.BLOB);
+ }
+ }
+
+ @Override
+ public Column convertINT96(PrimitiveTypeName primitiveTypeName) {
+ throw new RuntimeException("Converting from INT96 not supported.");
+ }
+ });
+ }
+
+ private Column convertComplexField(final Type fieldType) {
+ throw new RuntimeException("Complex types not supported.");
+ }
+
+ /**
+ * Converts a Tajo schema to a Parquet schema.
+ *
+ * @param tajoSchema The Tajo schema to convert.
+ * @return The resulting Parquet schema.
+ */
+ public MessageType convert(Schema tajoSchema) {
+ List<Type> types = new ArrayList<Type>();
+ for (int i = 0; i < tajoSchema.size(); ++i) {
+ Column column = tajoSchema.getColumn(i);
+ if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
+ continue;
+ }
+ types.add(convertColumn(column));
+ }
+ return new MessageType(TABLE_SCHEMA, types);
+ }
+
+ private Type convertColumn(Column column) {
+ TajoDataTypes.Type type = column.getDataType().getType();
+ switch (type) {
+ case BOOLEAN:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.BOOLEAN);
+ case BIT:
+ case INT2:
+ case INT4:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.INT32);
+ case INT8:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.INT64);
+ case FLOAT4:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.FLOAT);
+ case FLOAT8:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.DOUBLE);
+ case CHAR:
+ case TEXT:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.BINARY,
+ OriginalType.UTF8);
+ case PROTOBUF:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.BINARY);
+ case BLOB:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.BINARY);
+ case INET4:
+ case INET6:
+ return primitive(column.getSimpleName(),
+ PrimitiveTypeName.BINARY);
+ default:
+ throw new RuntimeException("Cannot convert Tajo type: " + type);
+ }
+ }
+
+ private PrimitiveType primitive(String name,
+ PrimitiveTypeName primitive) {
+ return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null);
+ }
+
+ private PrimitiveType primitive(String name,
+ PrimitiveTypeName primitive,
+ OriginalType originalType) {
+ return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name,
+ originalType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
new file mode 100644
index 0000000..e05aeaf
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.storage.Tuple;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tajo implementation of {@link parquet.hadoop.api.WriteSupport} for {@link org.apache.tajo.storage.Tuple}s.
+ * Users should use {@link ParquetAppender} and not this class directly.
+ */
+public class TajoWriteSupport extends WriteSupport<Tuple> {
+ private RecordConsumer recordConsumer;
+ private MessageType rootSchema;
+ private Schema rootTajoSchema;
+
+ /**
+ * Creates a new TajoWriteSupport.
+ *
+ * @param tajoSchema The Tajo schema for the table.
+ */
+ public TajoWriteSupport(Schema tajoSchema) {
+ this.rootSchema = new TajoSchemaConverter().convert(tajoSchema);
+ this.rootTajoSchema = tajoSchema;
+ }
+
+ /**
+ * Initializes the WriteSupport.
+ *
+ * @param configuration The job's configuration.
+ * @return A WriteContext that describes how to write the file.
+ */
+ @Override
+ public WriteContext init(Configuration configuration) {
+ Map<String, String> extraMetaData = new HashMap<String, String>();
+ return new WriteContext(rootSchema, extraMetaData);
+ }
+
+ /**
+ * Called once per row group.
+ *
+ * @param recordConsumer The {@link parquet.io.api.RecordConsumer} to write to.
+ */
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ /**
+ * Writes a Tuple to the file.
+ *
+ * @param tuple The Tuple to write to the file.
+ */
+ @Override
+ public void write(Tuple tuple) {
+ recordConsumer.startMessage();
+ writeRecordFields(rootSchema, rootTajoSchema, tuple);
+ recordConsumer.endMessage();
+ }
+
+ private void writeRecordFields(GroupType schema, Schema tajoSchema,
+ Tuple tuple) {
+ List<Type> fields = schema.getFields();
+ // Parquet ignores Tajo NULL_TYPE columns, so the index may differ.
+ int index = 0;
+ for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) {
+ Column column = tajoSchema.getColumn(tajoIndex);
+ if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
+ continue;
+ }
+ Datum datum = tuple.get(tajoIndex);
+ Type fieldType = fields.get(index);
+ if (!tuple.isNull(tajoIndex)) {
+ recordConsumer.startField(fieldType.getName(), index);
+ writeValue(fieldType, column, datum);
+ recordConsumer.endField(fieldType.getName(), index);
+ } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
+ throw new RuntimeException("Null-value for required field: " +
+ column.getSimpleName());
+ }
+ ++index;
+ }
+ }
+
+ private void writeValue(Type fieldType, Column column, Datum datum) {
+ switch (column.getDataType().getType()) {
+ case BOOLEAN:
+ recordConsumer.addBoolean(datum.asBool());
+ break;
+ case BIT:
+ case INT2:
+ case INT4:
+ recordConsumer.addInteger(datum.asInt4());
+ break;
+ case INT8:
+ recordConsumer.addLong(datum.asInt8());
+ break;
+ case FLOAT4:
+ recordConsumer.addFloat(datum.asFloat4());
+ break;
+ case FLOAT8:
+ recordConsumer.addDouble(datum.asFloat8());
+ break;
+ case CHAR:
+ case TEXT:
+ recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
+ break;
+ case PROTOBUF:
+ case BLOB:
+ case INET4:
+ case INET6:
+ recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
+ break;
+ default:
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java
new file mode 100644
index 0000000..d7d16b7
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/package-info.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * <p>
+ * Provides read and write support for Parquet files. Tajo schemas are
+ * converted to Parquet schemas according to the following mapping of Tajo
+ * and Parquet types:
+ * </p>
+ *
+ * <table>
+ * <tr>
+ * <th>Tajo type</th>
+ * <th>Parquet type</th>
+ * </tr>
+ * <tr>
+ * <td>NULL_TYPE</td>
+ * <td>No type. The field is not encoded in Parquet.</td>
+ * </tr>
+ * <tr>
+ * <td>BOOLEAN</td>
+ * <td>BOOLEAN</td>
+ * </tr>
+ * <tr>
+ * <td>BIT</td>
+ * <td>INT32</td>
+ * </tr>
+ * <tr>
+ * <td>INT2</td>
+ * <td>INT32</td>
+ * </tr>
+ * <tr>
+ * <td>INT4</td>
+ * <td>INT32</td>
+ * </tr>
+ * <tr>
+ * <td>INT8</td>
+ * <td>INT64</td>
+ * </tr>
+ * <tr>
+ * <td>FLOAT4</td>
+ * <td>FLOAT</td>
+ * </tr>
+ * <tr>
+ * <td>FLOAT8</td>
+ * <td>DOUBLE</td>
+ * </tr>
+ * <tr>
+ * <td>CHAR</td>
+ * <td>BINARY (with OriginalType UTF8)</td>
+ * </tr>
+ * <tr>
+ * <td>TEXT</td>
+ * <td>BINARY (with OriginalType UTF8)</td>
+ * </tr>
+ * <tr>
+ * <td>PROTOBUF</td>
+ * <td>BINARY</td>
+ * </tr>
+ * <tr>
+ * <td>BLOB</td>
+ * <td>BINARY</td>
+ * </tr>
+ * <tr>
+ * <td>INET4</td>
+ * <td>BINARY</td>
+ * </tr>
+ * </table>
+ *
+ * <p>
+ * Because Tajo fields can be NULL, all Parquet fields are marked as optional.
+ * </p>
+ *
+ * <p>
+ * The conversion from Tajo to Parquet is lossy without the original Tajo
+ * schema. As a result, Parquet files are read using the Tajo schema saved in
+ * the Tajo catalog for the table the Parquet files belong to, which was
+ * defined when the table was created.
+ * </p>
+ */
+
+package org.apache.tajo.storage.parquet;