You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 06:59:56 UTC

[22/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
deleted file mode 100644
index 69b76c4..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter;
-import parquet.hadoop.metadata.CompressionCodecName;
-
-import java.io.IOException;
-
-/**
- * Tajo implementation of {@link ParquetWriter} to write Tajo records to a
- * Parquet file. Users should use {@link ParquetAppender} and not this class
- * directly.
- */
-public class TajoParquetWriter extends ParquetWriter<Tuple> {
-  /**
-   * Create a new TajoParquetWriter
-   *
-   * @param file The file name to write to.
-   * @param schema The Tajo schema of the table.
-   * @param compressionCodecName Compression codec to use, or
-   *                             CompressionCodecName.UNCOMPRESSED.
-   * @param blockSize The block size threshold.
-   * @param pageSize See parquet write up. Blocks are subdivided into pages
-   *                 for alignment.
-   * @throws IOException
-   */
-  public TajoParquetWriter(Path file,
-                           Schema schema,
-                           CompressionCodecName compressionCodecName,
-                           int blockSize,
-                           int pageSize) throws IOException {
-    super(file,
-          new TajoWriteSupport(schema),
-          compressionCodecName,
-          blockSize,
-          pageSize);
-  }
-
-  /**
-   * Create a new TajoParquetWriter.
-   *
-   * @param file The file name to write to.
-   * @param schema The Tajo schema of the table.
-   * @param compressionCodecName Compression codec to use, or
-   *                             CompressionCodecName.UNCOMPRESSED.
-   * @param blockSize The block size threshold.
-   * @param pageSize See parquet write up. Blocks are subdivided into pages
-   *                 for alignment.
-   * @param enableDictionary Whether to use a dictionary to compress columns.
-   * @param validating Whether to turn on validation.
-   * @throws IOException
-   */
-  public TajoParquetWriter(Path file,
-                           Schema schema,
-                           CompressionCodecName compressionCodecName,
-                           int blockSize,
-                           int pageSize,
-                           boolean enableDictionary,
-                           boolean validating) throws IOException {
-    super(file,
-          new TajoWriteSupport(schema),
-          compressionCodecName,
-          blockSize,
-          pageSize,
-          enableDictionary,
-          validating);
-  }
-
-  /**
-   * Creates a new TajoParquetWriter. The default block size is 128 MB.
-   * The default page size is 1 MB. Default compression is no compression.
-   *
-   * @param file The Path of the file to write to.
-   * @param schema The Tajo schema of the table.
-   * @throws IOException
-   */
-  public TajoParquetWriter(Path file, Schema schema) throws IOException {
-    this(file,
-         schema,
-         CompressionCodecName.UNCOMPRESSED,
-         DEFAULT_BLOCK_SIZE,
-         DEFAULT_PAGE_SIZE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
deleted file mode 100644
index 269f782..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import java.util.Map;
-
-import parquet.Log;
-import parquet.hadoop.api.InitContext;
-import parquet.hadoop.api.ReadSupport;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.json.CatalogGsonHelper;
-import org.apache.tajo.storage.Tuple;
-
-/**
- * Tajo implementation of {@link ReadSupport} for {@link Tuple}s.
- * Users should use {@link ParquetScanner} and not this class directly.
- */
-public class TajoReadSupport extends ReadSupport<Tuple> {
-  private static final Log LOG = Log.getLog(TajoReadSupport.class);
-
-  private Schema readSchema;
-  private Schema requestedSchema;
-
-  /**
-   * Creates a new TajoReadSupport.
-   *
-   * @param requestedSchema The Tajo schema of the requested projection passed
-   *        down by ParquetScanner.
-   */
-  public TajoReadSupport(Schema readSchema, Schema requestedSchema) {
-    super();
-    this.readSchema = readSchema;
-    this.requestedSchema = requestedSchema;
-  }
-
-  /**
-   * Creates a new TajoReadSupport.
-   *
-   * @param readSchema The schema of the table.
-   */
-  public TajoReadSupport(Schema readSchema) {
-    super();
-    this.readSchema = readSchema;
-    this.requestedSchema = readSchema;
-  }
-
-  /**
-   * Initializes the ReadSupport.
-   *
-   * @param context The InitContext.
-   * @return A ReadContext that defines how to read the file.
-   */
-  @Override
-  public ReadContext init(InitContext context) {
-    if (requestedSchema == null) {
-      throw new RuntimeException("requestedSchema is null.");
-    }
-    MessageType requestedParquetSchema =
-      new TajoSchemaConverter().convert(requestedSchema);
-    LOG.debug("Reading data with projection:\n" + requestedParquetSchema);
-    return new ReadContext(requestedParquetSchema);
-  }
-
-  /**
-   * Prepares for read.
-   *
-   * @param configuration The job configuration.
-   * @param keyValueMetaData App-specific metadata from the file.
-   * @param fileSchema The schema of the Parquet file.
-   * @param readContext Returned by the init method.
-   */
-  @Override
-  public RecordMaterializer<Tuple> prepareForRead(
-      Configuration configuration,
-      Map<String, String> keyValueMetaData,
-      MessageType fileSchema,
-      ReadContext readContext) {
-    MessageType parquetRequestedSchema = readContext.getRequestedSchema();
-    return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
deleted file mode 100644
index a091eac..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import parquet.io.api.Binary;
-import parquet.io.api.Converter;
-import parquet.io.api.GroupConverter;
-import parquet.io.api.PrimitiveConverter;
-import parquet.schema.GroupType;
-import parquet.schema.Type;
-
-import java.nio.ByteBuffer;
-
-/**
- * Converter to convert a Parquet record into a Tajo Tuple.
- */
-public class TajoRecordConverter extends GroupConverter {
-  private final GroupType parquetSchema;
-  private final Schema tajoReadSchema;
-  private final int[] projectionMap;
-  private final int tupleSize;
-
-  private final Converter[] converters;
-
-  private Tuple currentTuple;
-
-  /**
-   * Creates a new TajoRecordConverter.
-   *
-   * @param parquetSchema The Parquet schema of the projection.
-   * @param tajoReadSchema The Tajo schema of the table.
-   * @param projectionMap An array mapping the projection column to the column
-   *                      index in the table.
-   */
-  public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema,
-                             int[] projectionMap) {
-    this.parquetSchema = parquetSchema;
-    this.tajoReadSchema = tajoReadSchema;
-    this.projectionMap = projectionMap;
-    this.tupleSize = tajoReadSchema.size();
-
-    // The projectionMap.length does not match parquetSchema.getFieldCount()
-    // when the projection contains NULL_TYPE columns. We will skip over the
-    // NULL_TYPE columns when we construct the converters and populate the
-    // NULL_TYPE columns with NullDatums in start().
-    int index = 0;
-    this.converters = new Converter[parquetSchema.getFieldCount()];
-    for (int i = 0; i < projectionMap.length; ++i) {
-      final int projectionIndex = projectionMap[i];
-      Column column = tajoReadSchema.getColumn(projectionIndex);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
-        continue;
-      }
-      Type type = parquetSchema.getType(index);
-      converters[index] = newConverter(column, type, new ParentValueContainer() {
-        @Override
-        void add(Object value) {
-          TajoRecordConverter.this.set(projectionIndex, value);
-        }
-      });
-      ++index;
-    }
-  }
-
-  private void set(int index, Object value) {
-    currentTuple.put(index, (Datum)value);
-  }
-
-  private Converter newConverter(Column column, Type type,
-                                 ParentValueContainer parent) {
-    DataType dataType = column.getDataType();
-    switch (dataType.getType()) {
-      case BOOLEAN:
-        return new FieldBooleanConverter(parent);
-      case BIT:
-        return new FieldBitConverter(parent);
-      case CHAR:
-        return new FieldCharConverter(parent);
-      case INT2:
-        return new FieldInt2Converter(parent);
-      case INT4:
-        return new FieldInt4Converter(parent);
-      case INT8:
-        return new FieldInt8Converter(parent);
-      case FLOAT4:
-        return new FieldFloat4Converter(parent);
-      case FLOAT8:
-        return new FieldFloat8Converter(parent);
-      case INET4:
-        return new FieldInet4Converter(parent);
-      case INET6:
-        throw new RuntimeException("No converter for INET6");
-      case TEXT:
-        return new FieldTextConverter(parent);
-      case PROTOBUF:
-        return new FieldProtobufConverter(parent, dataType);
-      case BLOB:
-        return new FieldBlobConverter(parent);
-      case NULL_TYPE:
-        throw new RuntimeException("No converter for NULL_TYPE.");
-      default:
-        throw new RuntimeException("Unsupported data type");
-    }
-  }
-
-  /**
-   * Gets the converter for a specific field.
-   *
-   * @param fieldIndex Index of the field in the projection.
-   * @return The converter for the field.
-   */
-  @Override
-  public Converter getConverter(int fieldIndex) {
-    return converters[fieldIndex];
-  }
-
-  /**
-   * Called before processing fields. This method fills any fields that have
-   * NULL values or have type NULL_TYPE with a NullDatum.
-   */
-  @Override
-  public void start() {
-    currentTuple = new VTuple(tupleSize);
-  }
-
-  /**
-   * Called after all fields have been processed.
-   */
-  @Override
-  public void end() {
-    for (int i = 0; i < projectionMap.length; ++i) {
-      final int projectionIndex = projectionMap[i];
-      Column column = tajoReadSchema.getColumn(projectionIndex);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
-          || currentTuple.get(projectionIndex) == null) {
-        set(projectionIndex, NullDatum.get());
-      }
-    }
-  }
-
-  /**
-   * Returns the current record converted by this converter.
-   *
-   * @return The current record.
-   */
-  public Tuple getCurrentRecord() {
-    return currentTuple;
-  }
-
-  static abstract class ParentValueContainer {
-    /**
-     * Adds the value to the parent.
-     *
-     * @param value The value to add.
-     */
-    abstract void add(Object value);
-  }
-
-  static final class FieldBooleanConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldBooleanConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBoolean(boolean value) {
-      parent.add(DatumFactory.createBool(value));
-    }
-  }
-
-  static final class FieldBitConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldBitConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createBit((byte)(value & 0xff)));
-    }
-  }
-
-  static final class FieldCharConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldCharConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createChar(value.getBytes()));
-    }
-  }
-
-  static final class FieldInt2Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInt2Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createInt2((short)value));
-    }
-  }
-
-  static final class FieldInt4Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInt4Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createInt4(value));
-    }
-  }
-
-  static final class FieldInt8Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInt8Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(DatumFactory.createInt8(value));
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createInt8(Long.valueOf(value)));
-    }
-  }
-
-  static final class FieldFloat4Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldFloat4Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(DatumFactory.createFloat4(Float.valueOf(value)));
-    }
-
-    @Override
-    final public void addFloat(float value) {
-      parent.add(DatumFactory.createFloat4(value));
-    }
-  }
-
-  static final class FieldFloat8Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldFloat8Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addInt(int value) {
-      parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
-    }
-
-    @Override
-    final public void addLong(long value) {
-      parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
-    }
-
-    @Override
-    final public void addFloat(float value) {
-      parent.add(DatumFactory.createFloat8(Double.valueOf(value)));
-    }
-
-    @Override
-    final public void addDouble(double value) {
-      parent.add(DatumFactory.createFloat8(value));
-    }
-  }
-
-  static final class FieldInet4Converter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldInet4Converter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createInet4(value.getBytes()));
-    }
-  }
-
-  static final class FieldTextConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldTextConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(DatumFactory.createText(value.getBytes()));
-    }
-  }
-
-  static final class FieldBlobConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-
-    public FieldBlobConverter(ParentValueContainer parent) {
-      this.parent = parent;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes())));
-    }
-  }
-
-  static final class FieldProtobufConverter extends PrimitiveConverter {
-    private final ParentValueContainer parent;
-    private final DataType dataType;
-
-    public FieldProtobufConverter(ParentValueContainer parent,
-                                  DataType dataType) {
-      this.parent = parent;
-      this.dataType = dataType;
-    }
-
-    @Override
-    final public void addBinary(Binary value) {
-      try {
-        ProtobufDatumFactory factory =
-            ProtobufDatumFactory.get(dataType.getCode());
-        Message.Builder builder = factory.newBuilder();
-        builder.mergeFrom(value.getBytes());
-        parent.add(factory.createDatum(builder));
-      } catch (InvalidProtocolBufferException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
deleted file mode 100644
index e31828c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import parquet.io.api.GroupConverter;
-import parquet.io.api.RecordMaterializer;
-import parquet.schema.MessageType;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.storage.Tuple;
-
-/**
- * Materializes a Tajo Tuple from a stream of Parquet data.
- */
-class TajoRecordMaterializer extends RecordMaterializer<Tuple> {
-  private final TajoRecordConverter root;
-
-  /**
-   * Creates a new TajoRecordMaterializer.
-   *
-   * @param parquetSchema The Parquet schema of the projection.
-   * @param tajoSchema The Tajo schema of the projection.
-   * @param tajoReadSchema The Tajo schema of the table.
-   */
-  public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema,
-                                Schema tajoReadSchema) {
-    int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
-    this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema,
-                                        projectionMap);
-  }
-
-  private int[] getProjectionMap(Schema schema, Schema projection) {
-    Column[] targets = projection.toArray();
-    int[] projectionMap = new int[targets.length];
-    for (int i = 0; i < targets.length; ++i) {
-      int tid = schema.getColumnId(targets[i].getQualifiedName());
-      projectionMap[i] = tid;
-    }
-    return projectionMap;
-  }
-
-  /**
-   * Returns the current record being materialized.
-   *
-   * @return The record being materialized.
-   */
-  @Override
-  public Tuple getCurrentRecord() {
-    return root.getCurrentRecord();
-  }
-
-  /**
-   * Returns the root converter.
-   *
-   * @return The root converter
-   */
-  @Override
-  public GroupConverter getRootConverter() {
-    return root;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
deleted file mode 100644
index 2592231..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import parquet.schema.MessageType;
-import parquet.schema.OriginalType;
-import parquet.schema.PrimitiveType;
-import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Converts between Parquet and Tajo schemas. See package documentation for
- * details on the mapping.
- */
-public class TajoSchemaConverter {
-  private static final String TABLE_SCHEMA = "table_schema";
-
-  /**
-   * Creates a new TajoSchemaConverter.
-   */
-  public TajoSchemaConverter() {
-  }
-
-  /**
-   * Converts a Parquet schema to a Tajo schema.
-   *
-   * @param parquetSchema The Parquet schema to convert.
-   * @return The resulting Tajo schema.
-   */
-  public Schema convert(MessageType parquetSchema) {
-    return convertFields(parquetSchema.getFields());
-  }
-
-  private Schema convertFields(List<Type> parquetFields) {
-    List<Column> columns = new ArrayList<Column>();
-    for (int i = 0; i < parquetFields.size(); ++i) {
-      Type fieldType = parquetFields.get(i);
-      if (fieldType.isRepetition(Type.Repetition.REPEATED)) {
-        throw new RuntimeException("REPEATED not supported outside LIST or" +
-            " MAP. Type: " + fieldType);
-      }
-      columns.add(convertField(fieldType));
-    }
-    Column[] columnsArray = new Column[columns.size()];
-    columnsArray = columns.toArray(columnsArray);
-    return new Schema(columnsArray);
-  }
-
-  private Column convertField(final Type fieldType) {
-    if (fieldType.isPrimitive()) {
-      return convertPrimitiveField(fieldType);
-    } else {
-      return convertComplexField(fieldType);
-    }
-  }
-
-  private Column convertPrimitiveField(final Type fieldType) {
-    final String fieldName = fieldType.getName();
-    final PrimitiveTypeName parquetPrimitiveTypeName =
-        fieldType.asPrimitiveType().getPrimitiveTypeName();
-    final OriginalType originalType = fieldType.getOriginalType();
-    return parquetPrimitiveTypeName.convert(
-        new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() {
-      @Override
-      public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.BOOLEAN);
-      }
-
-      @Override
-      public Column convertINT32(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.INT4);
-      }
-
-      @Override
-      public Column convertINT64(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.INT8);
-      }
-
-      @Override
-      public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.FLOAT4);
-      }
-
-      @Override
-      public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.FLOAT8);
-      }
-
-      @Override
-      public Column convertFIXED_LEN_BYTE_ARRAY(
-          PrimitiveTypeName primitiveTypeName) {
-        return new Column(fieldName, TajoDataTypes.Type.BLOB);
-      }
-
-      @Override
-      public Column convertBINARY(PrimitiveTypeName primitiveTypeName) {
-        if (originalType == OriginalType.UTF8) {
-          return new Column(fieldName, TajoDataTypes.Type.TEXT);
-        } else {
-          return new Column(fieldName, TajoDataTypes.Type.BLOB);
-        }
-      }
-
-      @Override
-      public Column convertINT96(PrimitiveTypeName primitiveTypeName) {
-        throw new RuntimeException("Converting from INT96 not supported.");
-      }
-    });
-  }
-
-  private Column convertComplexField(final Type fieldType) {
-    throw new RuntimeException("Complex types not supported.");
-  }
-
-  /**
-   * Converts a Tajo schema to a Parquet schema.
-   *
-   * @param tajoSchema The Tajo schema to convert.
-   * @return The resulting Parquet schema.
-   */
-  public MessageType convert(Schema tajoSchema) {
-    List<Type> types = new ArrayList<Type>();
-    for (int i = 0; i < tajoSchema.size(); ++i) {
-      Column column = tajoSchema.getColumn(i);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
-        continue;
-      }
-      types.add(convertColumn(column));
-    }
-    return new MessageType(TABLE_SCHEMA, types);
-  }
-
-  private Type convertColumn(Column column) {
-    TajoDataTypes.Type type = column.getDataType().getType();
-    switch (type) {
-      case BOOLEAN:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BOOLEAN);
-      case BIT:
-      case INT2:
-      case INT4:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.INT32);
-      case INT8:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.INT64);
-      case FLOAT4:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.FLOAT);
-      case FLOAT8:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.DOUBLE);
-      case CHAR:
-      case TEXT:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY,
-                         OriginalType.UTF8);
-      case PROTOBUF:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY);
-      case BLOB:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY);
-      case INET4:
-      case INET6:
-        return primitive(column.getSimpleName(),
-                         PrimitiveType.PrimitiveTypeName.BINARY);
-      default:
-        throw new RuntimeException("Cannot convert Tajo type: " + type);
-    }
-  }
-
-  private PrimitiveType primitive(String name,
-                                  PrimitiveType.PrimitiveTypeName primitive) {
-    return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null);
-  }
-
-  private PrimitiveType primitive(String name,
-                                  PrimitiveType.PrimitiveTypeName primitive,
-                                  OriginalType originalType) {
-    return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name,
-                             originalType);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
deleted file mode 100644
index 8651131..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.Tuple;
-import parquet.hadoop.api.WriteSupport;
-import parquet.io.api.Binary;
-import parquet.io.api.RecordConsumer;
-import parquet.schema.GroupType;
-import parquet.schema.MessageType;
-import parquet.schema.Type;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Tajo implementation of {@link WriteSupport} for {@link Tuple}s.
- * Users should use {@link ParquetAppender} and not this class directly.
- */
-public class TajoWriteSupport extends WriteSupport<Tuple> {
-  private RecordConsumer recordConsumer;
-  private MessageType rootSchema;
-  private Schema rootTajoSchema;
-
-  /**
-   * Creates a new TajoWriteSupport.
-   *
-   * @param tajoSchema The Tajo schema for the table.
-   */
-  public TajoWriteSupport(Schema tajoSchema) {
-    this.rootSchema = new TajoSchemaConverter().convert(tajoSchema);
-    this.rootTajoSchema = tajoSchema;
-  }
-
-  /**
-   * Initializes the WriteSupport.
-   *
-   * @param configuration The job's configuration.
-   * @return A WriteContext that describes how to write the file.
-   */
-  @Override
-  public WriteContext init(Configuration configuration) {
-    Map<String, String> extraMetaData = new HashMap<String, String>();
-    return new WriteContext(rootSchema, extraMetaData);
-  }
-
-  /**
-   * Called once per row group.
-   *
-   * @param recordConsumer The {@link RecordConsumer} to write to.
-   */
-  @Override
-  public void prepareForWrite(RecordConsumer recordConsumer) {
-    this.recordConsumer = recordConsumer;
-  }
-
-  /**
-   * Writes a Tuple to the file.
-   *
-   * @param tuple The Tuple to write to the file.
-   */
-  @Override
-  public void write(Tuple tuple) {
-    recordConsumer.startMessage();
-    writeRecordFields(rootSchema, rootTajoSchema, tuple);
-    recordConsumer.endMessage();
-  }
-
-  private void writeRecordFields(GroupType schema, Schema tajoSchema,
-                                 Tuple tuple) {
-    List<Type> fields = schema.getFields();
-    // Parquet ignores Tajo NULL_TYPE columns, so the index may differ.
-    int index = 0;
-    for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) {
-      Column column = tajoSchema.getColumn(tajoIndex);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
-        continue;
-      }
-      Datum datum = tuple.get(tajoIndex);
-      Type fieldType = fields.get(index);
-      if (!tuple.isNull(tajoIndex)) {
-        recordConsumer.startField(fieldType.getName(), index);
-        writeValue(fieldType, column, datum);
-        recordConsumer.endField(fieldType.getName(), index);
-      } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) {
-        throw new RuntimeException("Null-value for required field: " +
-            column.getSimpleName());
-      }
-      ++index;
-    }
-  }
-
-  private void writeValue(Type fieldType, Column column, Datum datum) {
-    switch (column.getDataType().getType()) {
-      case BOOLEAN:
-        recordConsumer.addBoolean(datum.asBool());
-        break;
-      case BIT:
-      case INT2:
-      case INT4:
-        recordConsumer.addInteger(datum.asInt4());
-        break;
-      case INT8:
-        recordConsumer.addLong(datum.asInt8());
-        break;
-      case FLOAT4:
-        recordConsumer.addFloat(datum.asFloat4());
-        break;
-      case FLOAT8:
-        recordConsumer.addDouble(datum.asFloat8());
-        break;
-      case CHAR:
-      case TEXT:
-        recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes()));
-        break;
-      case PROTOBUF:
-      case BLOB:
-      case INET4:
-      case INET6:
-        recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
-        break;
-      default:
-        break;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
deleted file mode 100644
index d7d16b7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * <p>
- * Provides read and write support for Parquet files. Tajo schemas are
- * converted to Parquet schemas according to the following mapping of Tajo
- * and Parquet types:
- * </p>
- *
- * <table>
- *   <tr>
- *     <th>Tajo type</th>
- *     <th>Parquet type</th>
- *   </tr>
- *   <tr>
- *     <td>NULL_TYPE</td>
- *     <td>No type. The field is not encoded in Parquet.</td>
- *   </tr>
- *   <tr>
- *     <td>BOOLEAN</td>
- *     <td>BOOLEAN</td>
- *   </tr>
- *   <tr>
- *     <td>BIT</td>
- *     <td>INT32</td>
- *   </tr>
- *   <tr>
- *     <td>INT2</td>
- *     <td>INT32</td>
- *   </tr>
- *   <tr>
- *     <td>INT4</td>
- *     <td>INT32</td>
- *   </tr>
- *   <tr>
- *     <td>INT8</td>
- *     <td>INT64</td>
- *   </tr>
- *   <tr>
- *     <td>FLOAT4</td>
- *     <td>FLOAT</td>
- *   </tr>
- *   <tr>
- *     <td>FLOAT8</td>
- *     <td>DOUBLE</td>
- *   </tr>
- *   <tr>
- *     <td>CHAR</td>
- *     <td>BINARY (with OriginalType UTF8)</td>
- *   </tr>
- *   <tr>
- *     <td>TEXT</td>
- *     <td>BINARY (with OriginalType UTF8)</td>
- *   </tr>
- *   <tr>
- *     <td>PROTOBUF</td>
- *     <td>BINARY</td>
- *   </tr>
- *   <tr>
- *     <td>BLOB</td>
- *     <td>BINARY</td>
- *   </tr>
- *   <tr>
- *     <td>INET4</td>
- *     <td>BINARY</td>
- *   </tr>
- * </table>
- *
- * <p>
- * Because Tajo fields can be NULL, all Parquet fields are marked as optional.
- * </p>
- *
- * <p>
- * The conversion from Tajo to Parquet is lossy without the original Tajo
- * schema. As a result, Parquet files are read using the Tajo schema saved in
- * the Tajo catalog for the table the Parquet files belong to, which was
- * defined when the table was created.
- * </p>
- */
-
-package org.apache.tajo.storage.parquet;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
deleted file mode 100644
index 5e200a0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import com.google.common.base.Objects;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable,
- * and is able to resize without recreating new array if not necessary.
- * <p>
- *
- * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field,
- * which is the desired valid number of <tt>BytesRefWritable</tt> it holds.
- * <tt>resetValid</tt> can reset the valid, but it will not care the underlying
- * BytesRefWritable.
- */
-
-public class BytesRefArrayWritable implements Writable,
-    Comparable<BytesRefArrayWritable> {
-
-  private BytesRefWritable[] bytesRefWritables = null;
-
-  private int valid = 0;
-
-  /**
-   * Constructs an empty array with the specified capacity.
-   *
-   * @param capacity
-   *          initial capacity
-   * @exception IllegalArgumentException
-   *              if the specified initial capacity is negative
-   */
-  public BytesRefArrayWritable(int capacity) {
-    if (capacity < 0) {
-      throw new IllegalArgumentException("Capacity can not be negative.");
-    }
-    bytesRefWritables = new BytesRefWritable[0];
-    ensureCapacity(capacity);
-  }
-
-  /**
-   * Constructs an empty array with a capacity of ten.
-   */
-  public BytesRefArrayWritable() {
-    this(10);
-  }
-
-  /**
-   * Returns the number of valid elements.
-   *
-   * @return the number of valid elements
-   */
-  public int size() {
-    return valid;
-  }
-
-  /**
-   * Gets the BytesRefWritable at the specified position. Make sure the position
-   * is valid by first call resetValid.
-   *
-   * @param index
-   *          the position index, starting from zero
-   * @throws IndexOutOfBoundsException
-   */
-  public BytesRefWritable get(int index) {
-    if (index >= valid) {
-      throw new IndexOutOfBoundsException(
-          "This BytesRefArrayWritable only has " + valid + " valid values.");
-    }
-    return bytesRefWritables[index];
-  }
-
-  /**
-   * Gets the BytesRefWritable at the specified position without checking.
-   *
-   * @param index
-   *          the position index, starting from zero
-   * @throws IndexOutOfBoundsException
-   */
-  public BytesRefWritable unCheckedGet(int index) {
-    return bytesRefWritables[index];
-  }
-
-  /**
-   * Set the BytesRefWritable at the specified position with the specified
-   * BytesRefWritable.
-   *
-   * @param index
-   *          index position
-   * @param bytesRefWritable
-   *          the new element
-   * @throws IllegalArgumentException
-   *           if the specified new element is null
-   */
-  public void set(int index, BytesRefWritable bytesRefWritable) {
-    if (bytesRefWritable == null) {
-      throw new IllegalArgumentException("Can not assign null.");
-    }
-    ensureCapacity(index + 1);
-    bytesRefWritables[index] = bytesRefWritable;
-    if (valid <= index) {
-      valid = index + 1;
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int compareTo(BytesRefArrayWritable other) {
-    if (other == null) {
-      throw new IllegalArgumentException("Argument can not be null.");
-    }
-    if (this == other) {
-      return 0;
-    }
-    int sizeDiff = valid - other.valid;
-    if (sizeDiff != 0) {
-      return sizeDiff;
-    }
-    for (int i = 0; i < valid; i++) {
-      if (other.contains(bytesRefWritables[i])) {
-        continue;
-      } else {
-        return 1;
-      }
-    }
-    return 0;
-  }
-
-  @Override
-  public int hashCode(){
-    return Objects.hashCode(bytesRefWritables);
-  }
-  /**
-   * Returns <tt>true</tt> if this instance contains one or more the specified
-   * BytesRefWritable.
-   *
-   * @param bytesRefWritable
-   *          BytesRefWritable element to be tested
-   * @return <tt>true</tt> if contains the specified element
-   * @throws IllegalArgumentException
-   *           if the specified element is null
-   */
-  public boolean contains(BytesRefWritable bytesRefWritable) {
-    if (bytesRefWritable == null) {
-      throw new IllegalArgumentException("Argument can not be null.");
-    }
-    for (int i = 0; i < valid; i++) {
-      if (bytesRefWritables[i].equals(bytesRefWritable)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public boolean equals(Object o) {
-    if (o == null || !(o instanceof BytesRefArrayWritable)) {
-      return false;
-    }
-    return compareTo((BytesRefArrayWritable) o) == 0;
-  }
-
-  /**
-   * Removes all elements.
-   */
-  public void clear() {
-    valid = 0;
-  }
-
-  /**
-   * enlarge the capacity if necessary, to ensure that it can hold the number of
-   * elements specified by newValidCapacity argument. It will also narrow the
-   * valid capacity when needed. Notice: it only enlarge or narrow the valid
-   * capacity with no care of the already stored invalid BytesRefWritable.
-   *
-   * @param newValidCapacity
-   *          the desired capacity
-   */
-  public void resetValid(int newValidCapacity) {
-    ensureCapacity(newValidCapacity);
-    valid = newValidCapacity;
-  }
-
-  protected void ensureCapacity(int newCapacity) {
-    int size = bytesRefWritables.length;
-    if (size < newCapacity) {
-      bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity);
-      while (size < newCapacity) {
-        bytesRefWritables[size] = new BytesRefWritable();
-        size++;
-      }
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    int count = in.readInt();
-    ensureCapacity(count);
-    for (int i = 0; i < count; i++) {
-      bytesRefWritables[i].readFields(in);
-    }
-    valid = count;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(valid);
-
-    for (int i = 0; i < valid; i++) {
-      BytesRefWritable cu = bytesRefWritables[i];
-      cu.write(out);
-    }
-  }
-
-  static {
-    WritableFactories.setFactory(BytesRefArrayWritable.class,
-        new WritableFactory() {
-
-          @Override
-          public Writable newInstance() {
-            return new BytesRefArrayWritable();
-          }
-
-        });
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
deleted file mode 100644
index c83b505..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.io.WritableFactories;
-import org.apache.hadoop.io.WritableFactory;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used
- * to avoid unnecessary byte copy.
- */
-public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> {
-
-  private static final byte[] EMPTY_BYTES = new byte[0];
-  public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable();
-
-  int start = 0;
-  int length = 0;
-  byte[] bytes = null;
-
-  LazyDecompressionCallback lazyDecompressObj;
-
-  /**
-   * Create a zero-size bytes.
-   */
-  public BytesRefWritable() {
-    this(EMPTY_BYTES);
-  }
-
-  /**
-   * Create a BytesRefWritable with <tt>length</tt> bytes.
-   */
-  public BytesRefWritable(int length) {
-    assert length > 0;
-    this.length = length;
-    bytes = new byte[this.length];
-    start = 0;
-  }
-
-  /**
-   * Create a BytesRefWritable referenced to the given bytes.
-   */
-  public BytesRefWritable(byte[] bytes) {
-    this.bytes = bytes;
-    length = bytes.length;
-    start = 0;
-  }
-
-  /**
-   * Create a BytesRefWritable referenced to one section of the given bytes. The
-   * section is determined by argument <tt>offset</tt> and <tt>len</tt>.
-   */
-  public BytesRefWritable(byte[] data, int offset, int len) {
-    bytes = data;
-    start = offset;
-    length = len;
-  }
-
-  /**
-   * Create a BytesRefWritable referenced to one section of the given bytes. The
-   * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback
-   * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to
-   * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and
-   * <tt>len</tt> after uncompressing the data.
-   */
-  public BytesRefWritable(LazyDecompressionCallback lazyDecompressData,
-                          int offset, int len) {
-    lazyDecompressObj = lazyDecompressData;
-    start = offset;
-    length = len;
-  }
-
-  private void lazyDecompress() throws IOException {
-    if (bytes == null && lazyDecompressObj != null) {
-      bytes = lazyDecompressObj.decompress();
-    }
-  }
-
-  /**
-   * Returns a copy of the underlying bytes referenced by this instance.
-   *
-   * @return a new copied byte array
-   * @throws IOException
-   */
-  public byte[] getBytesCopy() throws IOException {
-    lazyDecompress();
-    byte[] bb = new byte[length];
-    System.arraycopy(bytes, start, bb, 0, length);
-    return bb;
-  }
-
-  /**
-   * Returns the underlying bytes.
-   *
-   * @throws IOException
-   */
-  public byte[] getData() throws IOException {
-    lazyDecompress();
-    return bytes;
-  }
-
-  /**
-   * readFields() will corrupt the array. So use the set method whenever
-   * possible.
-   *
-   * @see #readFields(DataInput)
-   */
-  public void set(byte[] newData, int offset, int len) {
-    bytes = newData;
-    start = offset;
-    length = len;
-    lazyDecompressObj = null;
-  }
-
-  /**
-   * readFields() will corrupt the array. So use the set method whenever
-   * possible.
-   *
-   * @see #readFields(DataInput)
-   */
-  public void set(LazyDecompressionCallback newData, int offset, int len) {
-    bytes = null;
-    start = offset;
-    length = len;
-    lazyDecompressObj = newData;
-  }
-
-  public void writeDataTo(DataOutput out) throws IOException {
-    lazyDecompress();
-    out.write(bytes, start, length);
-  }
-
-  /**
-   * Always reuse the bytes array if length of bytes array is equal or greater
-   * to the current record, otherwise create a new one. readFields will corrupt
-   * the array. Please use set() whenever possible.
-   *
-   * @see #set(byte[], int, int)
-   */
-  public void readFields(DataInput in) throws IOException {
-    int len = in.readInt();
-    if (len > bytes.length) {
-      bytes = new byte[len];
-    }
-    start = 0;
-    length = len;
-    in.readFully(bytes, start, length);
-  }
-
-  /** {@inheritDoc} */
-  public void write(DataOutput out) throws IOException {
-    lazyDecompress();
-    out.writeInt(length);
-    out.write(bytes, start, length);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public int hashCode() {
-    return super.hashCode();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder(3 * length);
-    for (int idx = start; idx < length; idx++) {
-      // if not the first, put a blank separator in
-      if (idx != 0) {
-        sb.append(' ');
-      }
-      String num = Integer.toHexString(0xff & bytes[idx]);
-      // if it is only one digit, add a leading 0.
-      if (num.length() < 2) {
-        sb.append('0');
-      }
-      sb.append(num);
-    }
-    return sb.toString();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public int compareTo(BytesRefWritable other) {
-    if (other == null) {
-      throw new IllegalArgumentException("Argument can not be null.");
-    }
-    if (this == other) {
-      return 0;
-    }
-    try {
-      return WritableComparator.compareBytes(getData(), start, getLength(),
-          other.getData(), other.start, other.getLength());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public boolean equals(Object right_obj) {
-    if (right_obj == null || !(right_obj instanceof BytesRefWritable)) {
-      return false;
-    }
-    return compareTo((BytesRefWritable) right_obj) == 0;
-  }
-
-  static {
-    WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() {
-
-      @Override
-      public Writable newInstance() {
-        return new BytesRefWritable();
-      }
-
-    });
-  }
-
-  public int getLength() {
-    return length;
-  }
-
-  public int getStart() {
-    return start;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
deleted file mode 100644
index 352776f..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.StringUtils;
-
-import java.util.ArrayList;
-
-/**
- * ColumnProjectionUtils.
- *
- */
-public final class ColumnProjectionUtils {
-
-  public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids";
-
-  /**
-   * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
-   * is included in the list, RCFile's reader will not skip its value.
-   *
-   */
-  public static void setReadColumnIDs(Configuration conf, ArrayList<Integer> ids) {
-    String id = toReadColumnIDString(ids);
-    setReadColumnIDConf(conf, id);
-  }
-
-  /**
-   * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column
-   * is included in the list, RCFile's reader will not skip its value.
-   *
-   */
-  public static void appendReadColumnIDs(Configuration conf,
-                                         ArrayList<Integer> ids) {
-    String id = toReadColumnIDString(ids);
-    if (id != null) {
-      String old = conf.get(READ_COLUMN_IDS_CONF_STR, null);
-      String newConfStr = id;
-      if (old != null) {
-        newConfStr = newConfStr + StringUtils.COMMA_STR + old;
-      }
-
-      setReadColumnIDConf(conf, newConfStr);
-    }
-  }
-
-  private static void setReadColumnIDConf(Configuration conf, String id) {
-    if (id == null || id.length() <= 0) {
-      conf.set(READ_COLUMN_IDS_CONF_STR, "");
-      return;
-    }
-
-    conf.set(READ_COLUMN_IDS_CONF_STR, id);
-  }
-
-  private static String toReadColumnIDString(ArrayList<Integer> ids) {
-    String id = null;
-    if (ids != null) {
-      for (int i = 0; i < ids.size(); i++) {
-        if (i == 0) {
-          id = "" + ids.get(i);
-        } else {
-          id = id + StringUtils.COMMA_STR + ids.get(i);
-        }
-      }
-    }
-    return id;
-  }
-
-  /**
-   * Returns an array of column ids(start from zero) which is set in the given
-   * parameter <tt>conf</tt>.
-   */
-  public static ArrayList<Integer> getReadColumnIDs(Configuration conf) {
-    if (conf == null) {
-      return new ArrayList<Integer>(0);
-    }
-    String skips = conf.get(READ_COLUMN_IDS_CONF_STR, "");
-    String[] list = StringUtils.split(skips);
-    ArrayList<Integer> result = new ArrayList<Integer>(list.length);
-    for (String element : list) {
-      // it may contain duplicates, remove duplicates
-      Integer toAdd = Integer.parseInt(element);
-      if (!result.contains(toAdd)) {
-        result.add(toAdd);
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Clears the read column ids set in the conf, and will read all columns.
-   */
-  public static void setFullyReadColumns(Configuration conf) {
-    conf.set(READ_COLUMN_IDS_CONF_STR, "");
-  }
-
-  private ColumnProjectionUtils() {
-    // prevent instantiation
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
deleted file mode 100644
index 707d55a..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import java.io.IOException;
-
-/**
- * Used to call back lazy decompression process.
- *
- * @see BytesRefWritable
- */
-public interface LazyDecompressionCallback {
-
-  byte[] decompress() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
deleted file mode 100644
index bb6af22..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import java.io.ByteArrayInputStream;
-
-/**
- * A thread-not-safe version of ByteArrayInputStream, which removes all
- * synchronized modifiers.
- */
-public class NonSyncByteArrayInputStream extends ByteArrayInputStream {
-  public NonSyncByteArrayInputStream() {
-    super(new byte[] {});
-  }
-
-  public NonSyncByteArrayInputStream(byte[] bs) {
-    super(bs);
-  }
-
-  public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) {
-    super(buf, offset, length);
-  }
-
-  public void reset(byte[] input, int start, int length) {
-    buf = input;
-    count = start + length;
-    mark = start;
-    pos = start;
-  }
-
-  public int getPosition() {
-    return pos;
-  }
-
-  public int getLength() {
-    return count;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int read() {
-    return (pos < count) ? (buf[pos++] & 0xff) : -1;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int read(byte b[], int off, int len) {
-    if (b == null) {
-      throw new NullPointerException();
-    } else if (off < 0 || len < 0 || len > b.length - off) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (pos >= count) {
-      return -1;
-    }
-    if (pos + len > count) {
-      len = count - pos;
-    }
-    if (len <= 0) {
-      return 0;
-    }
-    System.arraycopy(buf, pos, b, off, len);
-    pos += len;
-    return len;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public long skip(long n) {
-    if (pos + n > count) {
-      n = count - pos;
-    }
-    if (n < 0) {
-      return 0;
-    }
-    pos += n;
-    return n;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int available() {
-    return count - pos;
-  }
-
-  public void seek(int pos) {
-    this.pos = pos;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
deleted file mode 100644
index 53a3dca..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- * A thread-not-safe version of ByteArrayOutputStream, which removes all
- * synchronized modifiers.
- */
-public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream {
-  public NonSyncByteArrayOutputStream(int size) {
-    super(size);
-  }
-
-  public NonSyncByteArrayOutputStream() {
-    super(64 * 1024);
-  }
-
-  public byte[] getData() {
-    return buf;
-  }
-
-  public int getLength() {
-    return count;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void reset() {
-    count = 0;
-  }
-
-  public void write(DataInput in, int length) throws IOException {
-    enLargeBuffer(length);
-    in.readFully(buf, count, length);
-    count += length;
-  }
-
-  private byte[] vLongBytes = new byte[9];
-
-  public int writeVLongToByteArray(byte[] bytes, int offset, long l) {
-    if (l >= -112 && l <= 127) {
-      bytes[offset] = (byte) l;
-      return 1;
-    }
-
-    int len = -112;
-    if (l < 0) {
-      l ^= -1L; // take one's complement'
-      len = -120;
-    }
-
-    long tmp = l;
-    while (tmp != 0) {
-      tmp = tmp >> 8;
-      len--;
-    }
-
-    bytes[offset++] = (byte) len;
-    len = (len < -120) ? -(len + 120) : -(len + 112);
-
-    for (int idx = len; idx != 0; idx--) {
-      int shiftbits = (idx - 1) * 8;
-      bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits);
-    }
-    return 1 + len;
-  }
-
-  public int writeVLong(long l) {
-    int len = writeVLongToByteArray(vLongBytes, 0, l);
-    write(vLongBytes, 0, len);
-    return len;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(int b) {
-    enLargeBuffer(1);
-    buf[count] = (byte) b;
-    count += 1;
-  }
-
-  private int enLargeBuffer(int increment) {
-    int temp = count + increment;
-    int newLen = temp;
-    if (temp > buf.length) {
-      if ((buf.length << 1) > temp) {
-        newLen = buf.length << 1;
-      }
-      byte newbuf[] = new byte[newLen];
-      System.arraycopy(buf, 0, newbuf, 0, count);
-      buf = newbuf;
-    }
-    return newLen;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void write(byte b[], int off, int len) {
-    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
-        || ((off + len) < 0)) {
-      throw new IndexOutOfBoundsException();
-    } else if (len == 0) {
-      return;
-    }
-    enLargeBuffer(len);
-    System.arraycopy(b, off, buf, count, len);
-    count += len;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public void writeTo(OutputStream out) throws IOException {
-    out.write(buf, 0, count);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
deleted file mode 100644
index 46745ab..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.hadoop.fs.Seekable;
-
-import java.io.*;
-
-/**
- * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all
- * synchronized modifiers.
- */
-public class NonSyncDataInputBuffer extends FilterInputStream implements
-    DataInput, Seekable {
-
-  private final NonSyncByteArrayInputStream buffer;
-
-  byte[] buff = new byte[16];
-
-  /** Constructs a new empty buffer. */
-  public NonSyncDataInputBuffer() {
-    this(new NonSyncByteArrayInputStream());
-  }
-
-  private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) {
-    super(buffer);
-    this.buffer = buffer;
-  }
-
-  /** Resets the data that the buffer reads. */
-  public void reset(byte[] input, int length) {
-    buffer.reset(input, 0, length);
-  }
-
-  /** Resets the data that the buffer reads. */
-  public void reset(byte[] input, int start, int length) {
-    buffer.reset(input, start, length);
-  }
-
-  /** Returns the current position in the input. */
-  public int getPosition() {
-    return buffer.getPosition();
-  }
-
-  /** Returns the length of the input. */
-  public int getLength() {
-    return buffer.getLength();
-  }
-
-  /**
-   * Reads bytes from the source stream into the byte array <code>buffer</code>.
-   * The number of bytes actually read is returned.
-   *
-   * @param buffer
-   *          the buffer to read bytes into
-   * @return the number of bytes actually read or -1 if end of stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  @Override
-  public final int read(byte[] buffer) throws IOException {
-    return in.read(buffer, 0, buffer.length);
-  }
-
-  /**
-   * Read at most <code>length</code> bytes from this DataInputStream and stores
-   * them in byte array <code>buffer</code> starting at <code>offset</code>.
-   * Answer the number of bytes actually read or -1 if no bytes were read and
-   * end of stream was encountered.
-   *
-   * @param buffer
-   *          the byte array in which to store the read bytes.
-   * @param offset
-   *          the offset in <code>buffer</code> to store the read bytes.
-   * @param length
-   *          the maximum number of bytes to store in <code>buffer</code>.
-   * @return the number of bytes actually read or -1 if end of stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  @Deprecated
-  @Override
-  public final int read(byte[] buffer, int offset, int length)
-      throws IOException {
-    return in.read(buffer, offset, length);
-  }
-
-  /**
-   * Reads a boolean from this stream.
-   *
-   * @return the next boolean value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final boolean readBoolean() throws IOException {
-    int temp = in.read();
-    if (temp < 0) {
-      throw new EOFException();
-    }
-    return temp != 0;
-  }
-
-  /**
-   * Reads an 8-bit byte value from this stream.
-   *
-   * @return the next byte value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final byte readByte() throws IOException {
-    int temp = in.read();
-    if (temp < 0) {
-      throw new EOFException();
-    }
-    return (byte) temp;
-  }
-
-  /**
-   * Reads a 16-bit character value from this stream.
-   *
-   * @return the next <code>char</code> value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  private int readToBuff(int count) throws IOException {
-    int offset = 0;
-
-    while (offset < count) {
-      int bytesRead = in.read(buff, offset, count - offset);
-      if (bytesRead == -1) {
-        return bytesRead;
-      }
-      offset += bytesRead;
-    }
-    return offset;
-  }
-
-  public final char readChar() throws IOException {
-    if (readToBuff(2) < 0) {
-      throw new EOFException();
-    }
-    return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
-
-  }
-
-  /**
-   * Reads a 64-bit <code>double</code> value from this stream.
-   *
-   * @return the next <code>double</code> value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final double readDouble() throws IOException {
-    return Double.longBitsToDouble(readLong());
-  }
-
-  /**
-   * Reads a 32-bit <code>float</code> value from this stream.
-   *
-   * @return the next <code>float</code> value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final float readFloat() throws IOException {
-    return Float.intBitsToFloat(readInt());
-  }
-
-  /**
-   * Reads bytes from this stream into the byte array <code>buffer</code>. This
-   * method will block until <code>buffer.length</code> number of bytes have
-   * been read.
-   *
-   * @param buffer
-   *          to read bytes into
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final void readFully(byte[] buffer) throws IOException {
-    readFully(buffer, 0, buffer.length);
-  }
-
-  /**
-   * Reads bytes from this stream and stores them in the byte array
-   * <code>buffer</code> starting at the position <code>offset</code>. This
-   * method blocks until <code>count</code> bytes have been read.
-   *
-   * @param buffer
-   *          the byte array into which the data is read
-   * @param offset
-   *          the offset the operation start at
-   * @param length
-   *          the maximum number of bytes to read
-   *
-   * @throws java.io.IOException
-   *           if a problem occurs while reading from this stream
-   * @throws java.io.EOFException
-   *           if reaches the end of the stream before enough bytes have been
-   *           read
-   */
-  public final void readFully(byte[] buffer, int offset, int length)
-      throws IOException {
-    if (length < 0) {
-      throw new IndexOutOfBoundsException();
-    }
-    if (length == 0) {
-      return;
-    }
-    if (in == null || buffer == null) {
-      throw new NullPointerException("Null Pointer to underlying input stream");
-    }
-
-    if (offset < 0 || offset > buffer.length - length) {
-      throw new IndexOutOfBoundsException();
-    }
-    while (length > 0) {
-      int result = in.read(buffer, offset, length);
-      if (result < 0) {
-        throw new EOFException();
-      }
-      offset += result;
-      length -= result;
-    }
-  }
-
-  /**
-   * Reads a 32-bit integer value from this stream.
-   *
-   * @return the next <code>int</code> value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final int readInt() throws IOException {
-    if (readToBuff(4) < 0) {
-      throw new EOFException();
-    }
-    return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
-        | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
-  }
-
-  /**
-   * Answers a <code>String</code> representing the next line of text available
-   * in this BufferedReader. A line is represented by 0 or more characters
-   * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code> or
-   * end of stream. The <code>String</code> does not include the newline
-   * sequence.
-   *
-   * @return the contents of the line or null if no characters were read before
-   *         end of stream.
-   *
-   * @throws java.io.IOException
-   *           If the DataInputStream is already closed or some other IO error
-   *           occurs.
-   *
-   * @deprecated Use BufferedReader
-   */
-  @Deprecated
-  public final String readLine() throws IOException {
-    StringBuilder line = new StringBuilder(80); // Typical line length
-    boolean foundTerminator = false;
-    while (true) {
-      int nextByte = in.read();
-      switch (nextByte) {
-        case -1:
-          if (line.length() == 0 && !foundTerminator) {
-            return null;
-          }
-          return line.toString();
-        case (byte) '\r':
-          if (foundTerminator) {
-            ((PushbackInputStream) in).unread(nextByte);
-            return line.toString();
-          }
-          foundTerminator = true;
-        /* Have to be able to peek ahead one byte */
-          if (!(in.getClass() == PushbackInputStream.class)) {
-            in = new PushbackInputStream(in);
-          }
-          break;
-        case (byte) '\n':
-          return line.toString();
-        default:
-          if (foundTerminator) {
-            ((PushbackInputStream) in).unread(nextByte);
-            return line.toString();
-          }
-          line.append((char) nextByte);
-      }
-    }
-  }
-
-  /**
-   * Reads a 64-bit <code>long</code> value from this stream.
-   *
-   * @return the next <code>long</code> value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final long readLong() throws IOException {
-    if (readToBuff(8) < 0) {
-      throw new EOFException();
-    }
-    int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16)
-        | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff);
-    int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16)
-        | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff);
-
-    return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL);
-  }
-
-  /**
-   * Reads a 16-bit <code>short</code> value from this stream.
-   *
-   * @return the next <code>short</code> value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final short readShort() throws IOException {
-    if (readToBuff(2) < 0) {
-      throw new EOFException();
-    }
-    return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
-  }
-
-  /**
-   * Reads an unsigned 8-bit <code>byte</code> value from this stream and
-   * returns it as an int.
-   *
-   * @return the next unsigned byte value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final int readUnsignedByte() throws IOException {
-    int temp = in.read();
-    if (temp < 0) {
-      throw new EOFException();
-    }
-    return temp;
-  }
-
-  /**
-   * Reads a 16-bit unsigned <code>short</code> value from this stream and
-   * returns it as an int.
-   *
-   * @return the next unsigned <code>short</code> value from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final int readUnsignedShort() throws IOException {
-    if (readToBuff(2) < 0) {
-      throw new EOFException();
-    }
-    return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff));
-  }
-
-  /**
-   * Reads a UTF format String from this Stream.
-   *
-   * @return the next UTF String from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public final String readUTF() throws IOException {
-    return decodeUTF(readUnsignedShort());
-  }
-
-  String decodeUTF(int utfSize) throws IOException {
-    return decodeUTF(utfSize, this);
-  }
-
-  private static String decodeUTF(int utfSize, DataInput in) throws IOException {
-    byte[] buf = new byte[utfSize];
-    char[] out = new char[utfSize];
-    in.readFully(buf, 0, utfSize);
-
-    return convertUTF8WithBuf(buf, out, 0, utfSize);
-  }
-
-  /**
-   * Reads a UTF format String from the DataInput Stream <code>in</code>.
-   *
-   * @param in
-   *          the input stream to read from
-   * @return the next UTF String from the source stream.
-   *
-   * @throws java.io.IOException
-   *           If a problem occurs reading from this DataInputStream.
-   *
-   */
-  public static final String readUTF(DataInput in) throws IOException {
-    return decodeUTF(in.readUnsignedShort(), in);
-  }
-
-  /**
-   * Skips <code>count</code> number of bytes in this stream. Subsequent
-   * <code>read()</code>'s will not return these bytes unless
-   * <code>reset()</code> is used.
-   *
-   * @param count
-   *          the number of bytes to skip.
-   * @return the number of bytes actually skipped.
-   *
-   * @throws java.io.IOException
-   *           If the stream is already closed or another IOException occurs.
-   */
-  public final int skipBytes(int count) throws IOException {
-    int skipped = 0;
-    long skip;
-    while (skipped < count && (skip = in.skip(count - skipped)) != 0) {
-      skipped += skip;
-    }
-    if (skipped < 0) {
-      throw new EOFException();
-    }
-    return skipped;
-  }
-
-  public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset,
-                                          int utfSize) throws UTFDataFormatException {
-    int count = 0, s = 0, a;
-    while (count < utfSize) {
-      if ((out[s] = (char) buf[offset + count++]) < '\u0080') {
-        s++;
-      } else if (((a = out[s]) & 0xe0) == 0xc0) {
-        if (count >= utfSize) {
-          throw new UTFDataFormatException();
-        }
-        int b = buf[count++];
-        if ((b & 0xC0) != 0x80) {
-          throw new UTFDataFormatException();
-        }
-        out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F));
-      } else if ((a & 0xf0) == 0xe0) {
-        if (count + 1 >= utfSize) {
-          throw new UTFDataFormatException();
-        }
-        int b = buf[count++];
-        int c = buf[count++];
-        if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) {
-          throw new UTFDataFormatException();
-        }
-        out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F));
-      } else {
-        throw new UTFDataFormatException();
-      }
-    }
-    return new String(out, 0, s);
-  }
-
-  @Override
-  public void seek(long pos) throws IOException {
-    buffer.seek((int)pos);
-  }
-
-  @Override
-  public long getPos() throws IOException {
-    return buffer.getPosition();
-  }
-
-  @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
deleted file mode 100644
index 3944f38..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import java.io.DataInput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-/**
- * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all
- * synchronized modifiers.
- */
-public class NonSyncDataOutputBuffer extends DataOutputStream {
-
-  private final NonSyncByteArrayOutputStream buffer;
-
-  /** Constructs a new empty buffer. */
-  public NonSyncDataOutputBuffer() {
-    this(new NonSyncByteArrayOutputStream());
-  }
-
-  private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) {
-    super(buffer);
-    this.buffer = buffer;
-  }
-
-  /**
-   * Returns the current contents of the buffer. Data is only valid to
-   * {@link #getLength()}.
-   */
-  public byte[] getData() {
-    return buffer.getData();
-  }
-
-  /** Returns the length of the valid data currently in the buffer. */
-  public int getLength() {
-    return buffer.getLength();
-  }
-
-  /** Resets the buffer to empty. */
-  public NonSyncDataOutputBuffer reset() {
-    written = 0;
-    buffer.reset();
-    return this;
-  }
-
-  /** Writes bytes from a DataInput directly into the buffer. */
-  public void write(DataInput in, int length) throws IOException {
-    buffer.write(in, length);
-  }
-
-  @Override
-  public void write(int b) throws IOException {
-    buffer.write(b);
-    incCount(1);
-  }
-
-  @Override
-  public void write(byte b[], int off, int len) throws IOException {
-    buffer.write(b, off, len);
-    incCount(len);
-  }
-
-  public void writeTo(DataOutputStream out) throws IOException {
-    buffer.writeTo(out);
-  }
-
-  private void incCount(int value) {
-    if (written + value < 0) {
-      written = Integer.MAX_VALUE;
-    } else {
-      written += value;
-    }
-  }
-}