You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:40 UTC

[25/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
deleted file mode 100644
index 72472fc..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.file.SeekableInput;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericFixed;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.util.Utf8;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.common.TajoDataTypes.DataType;
-import org.apache.tajo.datum.*;
-import org.apache.tajo.storage.FileScanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * FileScanner for reading Avro files
- */
-public class AvroScanner extends FileScanner {
-  private Schema avroSchema;
-  private List<Schema.Field> avroFields;
-  private DataFileReader<GenericRecord> dataFileReader;
-  private int[] projectionMap;
-
-  /**
-   * Creates a new AvroScanner.
-   *
-   * @param conf
-   * @param schema
-   * @param meta
-   * @param fragment
-   */
-  public AvroScanner(Configuration conf,
-                     final org.apache.tajo.catalog.Schema schema,
-                     final TableMeta meta, final Fragment fragment) {
-    super(conf, schema, meta, fragment);
-  }
-
-  /**
-   * Initializes the AvroScanner.
-   */
-  @Override
-  public void init() throws IOException {
-    if (targets == null) {
-      targets = schema.toArray();
-    }
-    prepareProjection(targets);
-
-    avroSchema = AvroUtil.getAvroSchema(meta, conf);
-    avroFields = avroSchema.getFields();
-
-    DatumReader<GenericRecord> datumReader =
-        new GenericDatumReader<GenericRecord>(avroSchema);
-    SeekableInput input = new FsInput(fragment.getPath(), conf);
-    dataFileReader = new DataFileReader<GenericRecord>(input, datumReader);
-    super.init();
-  }
-
-  private void prepareProjection(Column[] targets) {
-    projectionMap = new int[targets.length];
-    for (int i = 0; i < targets.length; ++i) {
-      projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName());
-    }
-  }
-
-  private static String fromAvroString(Object value) {
-    if (value instanceof Utf8) {
-      Utf8 utf8 = (Utf8)value;
-      return utf8.toString();
-    }
-    return value.toString();
-  }
-
-  private static Schema getNonNull(Schema schema) {
-    if (!schema.getType().equals(Schema.Type.UNION)) {
-      return schema;
-    }
-    List<Schema> schemas = schema.getTypes();
-    if (schemas.size() != 2) {
-      return schema;
-    }
-    if (schemas.get(0).getType().equals(Schema.Type.NULL)) {
-      return schemas.get(1);
-    } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) {
-      return schemas.get(0);
-    } else {
-      return schema;
-    }
-  }
-
-  private Datum convertInt(Object value, TajoDataTypes.Type tajoType) {
-    int intValue = (Integer)value;
-    switch (tajoType) {
-      case BIT:
-        return DatumFactory.createBit((byte)(intValue & 0xff));
-      case INT2:
-        return DatumFactory.createInt2((short)intValue);
-      default:
-        return DatumFactory.createInt4(intValue);
-    }
-  }
-
-  private Datum convertBytes(Object value, TajoDataTypes.Type tajoType,
-                             DataType dataType) {
-    ByteBuffer buffer = (ByteBuffer)value;
-    byte[] bytes = new byte[buffer.capacity()];
-    buffer.get(bytes, 0, bytes.length);
-    switch (tajoType) {
-      case INET4:
-        return DatumFactory.createInet4(bytes);
-      case PROTOBUF:
-        try {
-          ProtobufDatumFactory factory =
-              ProtobufDatumFactory.get(dataType.getCode());
-          Message.Builder builder = factory.newBuilder();
-          builder.mergeFrom(bytes);
-          return factory.createDatum(builder);
-        } catch (InvalidProtocolBufferException e) {
-          throw new RuntimeException(e);
-        }
-      default:
-        return new BlobDatum(bytes);
-    }
-  }
-
-  private Datum convertString(Object value, TajoDataTypes.Type tajoType) {
-    switch (tajoType) {
-      case CHAR:
-        return DatumFactory.createChar(fromAvroString(value));
-      default:
-        return DatumFactory.createText(fromAvroString(value));
-    }
-  }
-
-  /**
-   * Reads the next Tuple from the Avro file.
-   *
-   * @return The next Tuple from the Avro file or null if end of file is
-   *         reached.
-   */
-  @Override
-  public Tuple next() throws IOException {
-    if (!dataFileReader.hasNext()) {
-      return null;
-    }
-
-    Tuple tuple = new VTuple(schema.size());
-    GenericRecord record = dataFileReader.next();
-    for (int i = 0; i < projectionMap.length; ++i) {
-      int columnIndex = projectionMap[i];
-      Object value = record.get(columnIndex);
-      if (value == null) {
-        tuple.put(columnIndex, NullDatum.get());
-        continue;
-      }
-
-      // Get Avro type.
-      Schema.Field avroField = avroFields.get(columnIndex);
-      Schema nonNullAvroSchema = getNonNull(avroField.schema());
-      Schema.Type avroType = nonNullAvroSchema.getType();
-
-      // Get Tajo type.
-      Column column = schema.getColumn(columnIndex);
-      DataType dataType = column.getDataType();
-      TajoDataTypes.Type tajoType = dataType.getType();
-      switch (avroType) {
-        case NULL:
-          tuple.put(columnIndex, NullDatum.get());
-          break;
-        case BOOLEAN:
-          tuple.put(columnIndex, DatumFactory.createBool((Boolean)value));
-          break;
-        case INT:
-          tuple.put(columnIndex, convertInt(value, tajoType));
-          break;
-        case LONG:
-          tuple.put(columnIndex, DatumFactory.createInt8((Long)value));
-          break;
-        case FLOAT:
-          tuple.put(columnIndex, DatumFactory.createFloat4((Float)value));
-          break;
-        case DOUBLE:
-          tuple.put(columnIndex, DatumFactory.createFloat8((Double)value));
-          break;
-        case BYTES:
-          tuple.put(columnIndex, convertBytes(value, tajoType, dataType));
-          break;
-        case STRING:
-          tuple.put(columnIndex, convertString(value, tajoType));
-          break;
-        case RECORD:
-          throw new RuntimeException("Avro RECORD not supported.");
-        case ENUM:
-          throw new RuntimeException("Avro ENUM not supported.");
-        case MAP:
-          throw new RuntimeException("Avro MAP not supported.");
-        case UNION:
-          throw new RuntimeException("Avro UNION not supported.");
-        case FIXED:
-          tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes()));
-          break;
-        default:
-          throw new RuntimeException("Unknown type.");
-      }
-    }
-    return tuple;
-  }
-
-  /**
-   * Resets the scanner
-   */
-  @Override
-  public void reset() throws IOException {
-  }
-
-  /**
-   * Closes the scanner.
-   */
-  @Override
-  public void close() throws IOException {
-    if (dataFileReader != null) {
-      dataFileReader.close();
-    }
-  }
-
-  /**
-   * Returns whether this scanner is projectable.
-   *
-   * @return true
-   */
-  @Override
-  public boolean isProjectable() {
-    return true;
-  }
-
-  /**
-   * Returns whether this scanner is selectable.
-   *
-   * @return false
-   */
-  @Override
-  public boolean isSelectable() {
-    return false;
-  }
-
-  /**
-   * Returns whether this scanner is splittable.
-   *
-   * @return false
-   */
-  @Override
-  public boolean isSplittable() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
deleted file mode 100644
index 0d14c3d..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.avro;
-
-import org.apache.avro.Schema;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.StorageConstants;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-
-public class AvroUtil {
-  public static Schema getAvroSchema(TableMeta meta, Configuration conf)
-      throws IOException {
-
-    boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL);
-    boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL);
-    if (!isSchemaLiteral && !isSchemaUrl) {
-      throw new RuntimeException("No Avro schema for table.");
-    }
-    if (isSchemaLiteral) {
-      String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL);
-      return new Schema.Parser().parse(schema);
-    }
-
-    String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL);
-    if (schemaURL.toLowerCase().startsWith("http")) {
-      return getAvroSchemaFromHttp(schemaURL);
-    } else {
-      return getAvroSchemaFromFileSystem(schemaURL, conf);
-    }
-  }
-
-  public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException {
-    InputStream inputStream = new URL(schemaURL).openStream();
-
-    try {
-      return new Schema.Parser().parse(inputStream);
-    } finally {
-      IOUtils.closeStream(inputStream);
-    }
-  }
-
-  public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException {
-    Path schemaPath = new Path(schemaURL);
-    FileSystem fs = schemaPath.getFileSystem(conf);
-    FSDataInputStream inputStream = fs.open(schemaPath);
-
-    try {
-      return new Schema.Parser().parse(inputStream);
-    } finally {
-      IOUtils.closeStream(inputStream);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
deleted file mode 100644
index 40d1545..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * <p>
- * Provides read and write support for Avro files. Avro schemas are
- * converted to Tajo schemas according to the following mapping of Avro
- * and Tajo types:
- * </p>
- *
- * <table>
- *   <tr>
- *     <th>Avro type</th>
- *     <th>Tajo type</th>
- *   </tr>
- *   <tr>
- *     <td>NULL</td>
- *     <td>NULL_TYPE</td>
- *   </tr>
- *   <tr>
- *     <td>BOOLEAN</td>
- *     <td>BOOLEAN</td>
- *   </tr>
- *   <tr>
- *     <td>INT</td>
- *     <td>INT4</td>
- *   </tr>
- *   <tr>
- *     <td>LONG</td>
- *     <td>INT8</td>
- *   </tr>
- *   <tr>
- *     <td>FLOAT</td>
- *     <td>FLOAT4</td>
- *   </tr>
- *   <tr>
- *     <td>DOUBLE</td>
- *     <td>FLOAT8</td>
- *   </tr>
- *   <tr>
- *     <td>BYTES</td>
- *     <td>BLOB</td>
- *   </tr>
- *   <tr>
- *     <td>STRING</td>
- *     <td>TEXT</td>
- *   </tr>
- *   <tr>
- *     <td>FIXED</td>
- *     <td>BLOB</td>
- *   </tr>
- *   <tr>
- *     <td>RECORD</td>
- *     <td>Not currently supported</td>
- *   </tr>
- *   <tr>
- *     <td>ENUM</td>
- *     <td>Not currently supported.</td>
- *   </tr>
- *   <tr>
- *     <td>MAP</td>
- *     <td>Not currently supported.</td>
- *   </tr>
- *   <tr>
- *     <td>UNION</td>
- *     <td>Not currently supported.</td>
- *   </tr>
- * </table>
- */
-
-package org.apache.tajo.storage.avro;

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
deleted file mode 100644
index baeda8c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.compress;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-import org.apache.hadoop.io.compress.DoNotPool;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A global compressor/decompressor pool used to save and reuse (possibly
- * native) compression/decompression codecs.
- */
-public final class CodecPool {
-  private static final Log LOG = LogFactory.getLog(CodecPool.class);
-
-  /**
-   * A global compressor pool used to save the expensive
-   * construction/destruction of (possibly native) decompression codecs.
-   */
-  private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL =
-      new HashMap<Class<Compressor>, List<Compressor>>();
-
-  /**
-   * A global decompressor pool used to save the expensive
-   * construction/destruction of (possibly native) decompression codecs.
-   */
-  private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL =
-      new HashMap<Class<Decompressor>, List<Decompressor>>();
-
-  private static <T> T borrow(Map<Class<T>, List<T>> pool,
-      Class<? extends T> codecClass) {
-    T codec = null;
-
-    // Check if an appropriate codec is available
-    synchronized (pool) {
-      if (pool.containsKey(codecClass)) {
-        List<T> codecList = pool.get(codecClass);
-
-        if (codecList != null) {
-          synchronized (codecList) {
-            if (!codecList.isEmpty()) {
-              codec = codecList.remove(codecList.size() - 1);
-            }
-          }
-        }
-      }
-    }
-
-    return codec;
-  }
-
-  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
-    if (codec != null) {
-      Class<T> codecClass = (Class<T>) codec.getClass();
-      synchronized (pool) {
-        if (!pool.containsKey(codecClass)) {
-          pool.put(codecClass, new ArrayList<T>());
-        }
-
-        List<T> codecList = pool.get(codecClass);
-        synchronized (codecList) {
-          codecList.add(codec);
-        }
-      }
-    }
-  }
-
-  /**
-   * Get a {@link Compressor} for the given {@link CompressionCodec} from the
-   * pool or a new one.
-   *
-   * @param codec
-   *          the <code>CompressionCodec</code> for which to get the
-   *          <code>Compressor</code>
-   * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
-   * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
-   *         from the pool or a new one
-   */
-  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
-    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
-    if (compressor == null) {
-      compressor = codec.createCompressor();
-      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
-    } else {
-      compressor.reinit(conf);
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Got recycled compressor");
-      }
-    }
-    return compressor;
-  }
-
-  public static Compressor getCompressor(CompressionCodec codec) {
-    return getCompressor(codec, null);
-  }
-
-  /**
-   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
-   * pool or a new one.
-   *
-   * @param codec
-   *          the <code>CompressionCodec</code> for which to get the
-   *          <code>Decompressor</code>
-   * @return <code>Decompressor</code> for the given
-   *         <code>CompressionCodec</code> the pool or a new one
-   */
-  public static Decompressor getDecompressor(CompressionCodec codec) {
-    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
-        .getDecompressorType());
-    if (decompressor == null) {
-      decompressor = codec.createDecompressor();
-      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
-    } else {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Got recycled decompressor");
-      }
-    }
-    return decompressor;
-  }
-
-  /**
-   * Return the {@link Compressor} to the pool.
-   *
-   * @param compressor
-   *          the <code>Compressor</code> to be returned to the pool
-   */
-  public static void returnCompressor(Compressor compressor) {
-    if (compressor == null) {
-      return;
-    }
-    // if the compressor can't be reused, don't pool it.
-    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
-      return;
-    }
-    compressor.reset();
-    payback(COMPRESSOR_POOL, compressor);
-  }
-
-  /**
-   * Return the {@link Decompressor} to the pool.
-   *
-   * @param decompressor
-   *          the <code>Decompressor</code> to be returned to the pool
-   */
-  public static void returnDecompressor(Decompressor decompressor) {
-    if (decompressor == null) {
-      return;
-    }
-    // if the decompressor can't be reused, don't pool it.
-    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
-      return;
-    }
-    decompressor.reset();
-    payback(DECOMPRESSOR_POOL, decompressor);
-  }
-
-  private CodecPool() {
-    // prevent instantiation
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
deleted file mode 100644
index bb035a8..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.exception;
-
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-
-public class AlreadyExistsStorageException extends IOException {
-  private static final long serialVersionUID = 965518916144019032L;
-
-
-  public AlreadyExistsStorageException(String path) {
-    super("Error: "+path+" alreay exists");    
-  }
-  
-  public AlreadyExistsStorageException(Path path) {
-    this(path.toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
deleted file mode 100644
index a67d1f7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.exception;
-
-public class UnknownCodecException extends Exception {
-
-  private static final long serialVersionUID = 4287230843540404529L;
-
-  public UnknownCodecException() {
-
-  }
-
-  public UnknownCodecException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
deleted file mode 100644
index d18b5a0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.exception;
-
-public class UnknownDataTypeException extends Exception {
-
-  private static final long serialVersionUID = -2630390595968966164L;
-
-  public UnknownDataTypeException() {
-
-  }
-
-  public UnknownDataTypeException(String message) {
-    super(message);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
deleted file mode 100644
index 8b197d6..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage.exception;
-
-public class UnsupportedFileTypeException extends RuntimeException {
-	private static final long serialVersionUID = -8160289695849000342L;
-
-	public UnsupportedFileTypeException() {
-	}
-
-	/**
-	 * @param message
-	 */
-	public UnsupportedFileTypeException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
deleted file mode 100644
index 4a83dbf..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.util.TUtil;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable {
-  @Expose private String tableName; // required
-  @Expose private Path uri; // required
-  @Expose public Long startOffset; // required
-  @Expose public Long length; // required
-
-  private String[] hosts; // Datanode hostnames
-  @Expose private int[] diskIds;
-
-  public FileFragment(ByteString raw) throws InvalidProtocolBufferException {
-    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
-    builder.mergeFrom(raw);
-    builder.build();
-    init(builder.build());
-  }
-
-  public FileFragment(String tableName, Path uri, BlockLocation blockLocation)
-      throws IOException {
-    this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null);
-  }
-
-  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) {
-    this.set(tableName, uri, start, length, hosts, diskIds);
-  }
-  // Non splittable
-  public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) {
-    this.set(tableName, uri, start, length, hosts, null);
-  }
-
-  public FileFragment(String fragmentId, Path path, long start, long length) {
-    this.set(fragmentId, path, start, length, null, null);
-  }
-
-  public FileFragment(FileFragmentProto proto) {
-    init(proto);
-  }
-
-  private void init(FileFragmentProto proto) {
-    int[] diskIds = new int[proto.getDiskIdsList().size()];
-    int i = 0;
-    for(Integer eachValue: proto.getDiskIdsList()) {
-      diskIds[i++] = eachValue;
-    }
-    this.set(proto.getId(), new Path(proto.getPath()),
-        proto.getStartOffset(), proto.getLength(),
-        proto.getHostsList().toArray(new String[]{}),
-        diskIds);
-  }
-
-  private void set(String tableName, Path path, long start,
-      long length, String[] hosts, int[] diskIds) {
-    this.tableName = tableName;
-    this.uri = path;
-    this.startOffset = start;
-    this.length = length;
-    this.hosts = hosts;
-    this.diskIds = diskIds;
-  }
-
-
-  /**
-   * Get the list of hosts (hostname) hosting this block
-   */
-  public String[] getHosts() {
-    if (hosts == null) {
-      this.hosts = new String[0];
-    }
-    return hosts;
-  }
-
-  /**
-   * Get the list of Disk Ids
-   * Unknown disk is -1. Others 0 ~ N
-   */
-  public int[] getDiskIds() {
-    if (diskIds == null) {
-      this.diskIds = new int[getHosts().length];
-      Arrays.fill(this.diskIds, -1);
-    }
-    return diskIds;
-  }
-
-  public void setDiskIds(int[] diskIds){
-    this.diskIds = diskIds;
-  }
-
-  @Override
-  public String getTableName() {
-    return this.tableName;
-  }
-
-  public Path getPath() {
-    return this.uri;
-  }
-
-  public void setPath(Path path) {
-    this.uri = path;
-  }
-
-  public Long getStartKey() {
-    return this.startOffset;
-  }
-
-  @Override
-  public String getKey() {
-    return this.uri.toString();
-  }
-
-  @Override
-  public long getLength() {
-    return this.length;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return this.length <= 0;
-  }
-  /**
-   * 
-   * The offset range of tablets <b>MUST NOT</b> be overlapped.
-   * 
-   * @param t
-   * @return If the table paths are not same, return -1.
-   */
-  @Override
-  public int compareTo(FileFragment t) {
-    if (getPath().equals(t.getPath())) {
-      long diff = this.getStartKey() - t.getStartKey();
-      if (diff < 0) {
-        return -1;
-      } else if (diff > 0) {
-        return 1;
-      } else {
-        return 0;
-      }
-    } else {
-      return -1;
-    }
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof FileFragment) {
-      FileFragment t = (FileFragment) o;
-      if (getPath().equals(t.getPath())
-          && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
-          && TUtil.checkEquals(t.getLength(), this.getLength())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(tableName, uri, startOffset, length);
-  }
-  
-  public Object clone() throws CloneNotSupportedException {
-    FileFragment frag = (FileFragment) super.clone();
-    frag.tableName = tableName;
-    frag.uri = uri;
-    frag.diskIds = diskIds;
-    frag.hosts = hosts;
-
-    return frag;
-  }
-
-  @Override
-  public String toString() {
-    return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
-    		+getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
-        + getLength() + "}" ;
-  }
-
-  public FragmentProto getProto() {
-    FileFragmentProto.Builder builder = FileFragmentProto.newBuilder();
-    builder.setId(this.tableName);
-    builder.setStartOffset(this.startOffset);
-    builder.setLength(this.length);
-    builder.setPath(this.uri.toString());
-    if(diskIds != null) {
-      List<Integer> idList = new ArrayList<Integer>();
-      for(int eachId: diskIds) {
-        idList.add(eachId);
-      }
-      builder.addAllDiskIds(idList);
-    }
-
-    if(hosts != null) {
-      builder.addAllHosts(TUtil.newList(hosts));
-    }
-
-    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
-    fragmentBuilder.setId(this.tableName);
-    fragmentBuilder.setStoreType(StoreType.CSV.name());
-    fragmentBuilder.setContents(builder.buildPartial().toByteString());
-    return fragmentBuilder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
deleted file mode 100644
index ac43197..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import org.apache.tajo.common.ProtoObject;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-public interface Fragment extends ProtoObject<FragmentProto> {
-
-  public abstract String getTableName();
-
-  @Override
-  public abstract FragmentProto getProto();
-
-  public abstract long getLength();
-
-  public abstract String getKey();
-
-  public String[] getHosts();
-
-  public abstract boolean isEmpty();
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
deleted file mode 100644
index 07720c7..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.fragment;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.protobuf.ByteString;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tajo.annotation.ThreadSafe;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-
-@ThreadSafe
-public class FragmentConvertor {
-  /**
-   * Cache of fragment classes
-   */
-  protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap();
-  /**
-   * Cache of constructors for each class.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap();
-  /**
-   * default parameter for all constructors
-   */
-  private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class };
-
-  public static Class<? extends Fragment> getFragmentClass(Configuration conf, String storeType)
-  throws IOException {
-    Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase());
-    if (fragmentClass == null) {
-      fragmentClass = conf.getClass(
-          String.format("tajo.storage.fragment.%s.class", storeType.toLowerCase()), null, Fragment.class);
-      CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass);
-    }
-
-    if (fragmentClass == null) {
-      throw new IOException("No such a fragment for " + storeType.toLowerCase());
-    }
-
-    return fragmentClass;
-  }
-
-  public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) {
-    T result;
-    try {
-      Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz);
-      if (constructor == null) {
-        constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS);
-        constructor.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(clazz, constructor);
-      }
-      result = constructor.newInstance(new Object[]{fragment.getContents()});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment)
-      throws IOException {
-    Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getStoreType().toLowerCase());
-    if (fragmentClass == null) {
-      throw new IOException("No such a fragment class for " + fragment.getStoreType());
-    }
-    return convert(fragmentClass, fragment);
-  }
-
-  public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments)
-      throws IOException {
-    List<T> list = Lists.newArrayList();
-    if (fragments == null) {
-      return list;
-    }
-    for (FragmentProto proto : fragments) {
-      list.add(convert(clazz, proto));
-    }
-    return list;
-  }
-
-  public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) throws IOException {
-    List<T> list = Lists.newArrayList();
-    if (fragments == null) {
-      return list;
-    }
-    for (FragmentProto proto : fragments) {
-      list.add((T) convert(conf, proto));
-    }
-    return list;
-  }
-
-  public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) {
-    List<FragmentProto> list = Lists.newArrayList();
-    if (fragments == null) {
-      return list;
-    }
-    for (Fragment fragment : fragments) {
-      list.add(fragment.getProto());
-    }
-    return list;
-  }
-
-  public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) {
-    List<FragmentProto> list = toFragmentProtoList(fragments);
-    return list.toArray(new FragmentProto[list.size()]);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
deleted file mode 100644
index 8615235..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.TableStatistics;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.util.TUtil;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * An abstract class for HBase appender.
- */
-public abstract class AbstractHBaseAppender implements Appender {
-  protected Configuration conf;
-  protected Schema schema;
-  protected TableMeta meta;
-  protected QueryUnitAttemptId taskAttemptId;
-  protected Path stagingDir;
-  protected boolean inited = false;
-
-  protected ColumnMapping columnMapping;
-  protected TableStatistics stats;
-  protected boolean enabledStats;
-
-  protected int columnNum;
-
-  protected byte[][][] mappingColumnFamilies;
-  protected boolean[] isBinaryColumns;
-  protected boolean[] isRowKeyMappings;
-  protected boolean[] isColumnKeys;
-  protected boolean[] isColumnValues;
-  protected int[] rowKeyFieldIndexes;
-  protected int[] rowkeyColumnIndexes;
-  protected char rowKeyDelimiter;
-
-  // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping
-  protected int[] columnKeyValueDataIndexes;
-  protected byte[][] columnKeyDatas;
-  protected byte[][] columnValueDatas;
-  protected byte[][] columnKeyCfNames;
-
-  protected KeyValue[] keyValues;
-
-  public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
-                       Schema schema, TableMeta meta, Path stagingDir) {
-    this.conf = conf;
-    this.schema = schema;
-    this.meta = meta;
-    this.stagingDir = stagingDir;
-    this.taskAttemptId = taskAttemptId;
-  }
-
-  @Override
-  public void init() throws IOException {
-    if (inited) {
-      throw new IllegalStateException("FileAppender is already initialized.");
-    }
-    inited = true;
-    if (enabledStats) {
-      stats = new TableStatistics(this.schema);
-    }
-    columnMapping = new ColumnMapping(schema, meta);
-
-    mappingColumnFamilies = columnMapping.getMappingColumns();
-
-    isRowKeyMappings = columnMapping.getIsRowKeyMappings();
-    List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>();
-    for (int i = 0; i < isRowKeyMappings.length; i++) {
-      if (isRowKeyMappings[i]) {
-        rowkeyColumnIndexList.add(i);
-      }
-    }
-    rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList);
-
-    isBinaryColumns = columnMapping.getIsBinaryColumns();
-    isColumnKeys = columnMapping.getIsColumnKeys();
-    isColumnValues = columnMapping.getIsColumnValues();
-    rowKeyDelimiter = columnMapping.getRowKeyDelimiter();
-    rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes();
-
-    this.columnNum = schema.size();
-
-    // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value
-    // which are mapped to the same column family.
-    columnKeyValueDataIndexes = new int[isColumnKeys.length];
-    int index = 0;
-    int numKeyValues = 0;
-    Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>();
-    for (int i = 0; i < isColumnKeys.length; i++) {
-      if (isRowKeyMappings[i]) {
-        continue;
-      }
-      if (isColumnKeys[i] || isColumnValues[i]) {
-        String cfName = new String(mappingColumnFamilies[i][0]);
-        if (!cfNameIndexMap.containsKey(cfName)) {
-          cfNameIndexMap.put(cfName, index);
-          columnKeyValueDataIndexes[i] = index;
-          index++;
-          numKeyValues++;
-        } else {
-          columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName);
-        }
-      } else {
-        numKeyValues++;
-      }
-    }
-    columnKeyCfNames = new byte[cfNameIndexMap.size()][];
-    for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) {
-      columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes();
-    }
-    columnKeyDatas = new byte[cfNameIndexMap.size()][];
-    columnValueDatas = new byte[cfNameIndexMap.size()][];
-
-    keyValues = new KeyValue[numKeyValues];
-  }
-
-  private ByteArrayOutputStream bout = new ByteArrayOutputStream();
-
-  protected byte[] getRowKeyBytes(Tuple tuple) throws IOException {
-    Datum datum;
-    byte[] rowkey;
-    if (rowkeyColumnIndexes.length > 1) {
-      bout.reset();
-      for (int i = 0; i < rowkeyColumnIndexes.length; i++) {
-        datum = tuple.get(rowkeyColumnIndexes[i]);
-        if (isBinaryColumns[rowkeyColumnIndexes[i]]) {
-          rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
-        } else {
-          rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum);
-        }
-        bout.write(rowkey);
-        if (i < rowkeyColumnIndexes.length - 1) {
-          bout.write(rowKeyDelimiter);
-        }
-      }
-      rowkey = bout.toByteArray();
-    } else {
-      int index = rowkeyColumnIndexes[0];
-      datum = tuple.get(index);
-      if (isBinaryColumns[index]) {
-        rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum);
-      } else {
-        rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum);
-      }
-    }
-
-    return rowkey;
-  }
-
-  protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException {
-    int keyValIndex = 0;
-    for (int i = 0; i < columnNum; i++) {
-      if (isRowKeyMappings[i]) {
-        continue;
-      }
-      Datum datum = tuple.get(i);
-      byte[] value;
-      if (isBinaryColumns[i]) {
-        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
-      } else {
-        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
-      }
-
-      if (isColumnKeys[i]) {
-        columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
-      } else if (isColumnValues[i]) {
-        columnValueDatas[columnKeyValueDataIndexes[i]] = value;
-      } else {
-        keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
-        keyValIndex++;
-      }
-    }
-
-    for (int i = 0; i < columnKeyDatas.length; i++) {
-      keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
-    }
-  }
-
-  @Override
-  public void enableStats() {
-    enabledStats = true;
-  }
-
-  @Override
-  public TableStats getStats() {
-    if (enabledStats) {
-      return stats.getTableStat();
-    } else {
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
deleted file mode 100644
index 8044494..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.SortSpec;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.plan.LogicalPlan;
-import org.apache.tajo.plan.PlanningException;
-import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.plan.logical.SortNode.SortPurpose;
-import org.apache.tajo.plan.rewrite.RewriteRule;
-import org.apache.tajo.plan.util.PlannerUtil;
-
-public class AddSortForInsertRewriter implements RewriteRule {
-  private int[] sortColumnIndexes;
-  private Column[] sortColumns;
-  public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) {
-    this.sortColumns = sortColumns;
-    this.sortColumnIndexes = new int[sortColumns.length];
-
-    Schema tableSchema = tableDesc.getSchema();
-    for (int i = 0; i < sortColumns.length; i++) {
-      sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName());
-    }
-  }
-
-  @Override
-  public String getName() {
-    return "AddSortForInsertRewriter";
-  }
-
-  @Override
-  public boolean isEligible(LogicalPlan plan) {
-    StoreType storeType = PlannerUtil.getStoreType(plan);
-    return storeType != null;
-  }
-
-  @Override
-  public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException {
-    LogicalRootNode rootNode = plan.getRootBlock().getRoot();
-    UnaryNode insertNode = rootNode.getChild();
-    LogicalNode childNode = insertNode.getChild();
-
-    Schema sortSchema = childNode.getOutSchema();
-    SortNode sortNode = plan.createNode(SortNode.class);
-    sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED);
-    sortNode.setInSchema(sortSchema);
-    sortNode.setOutSchema(sortSchema);
-
-    SortSpec[] sortSpecs = new SortSpec[sortColumns.length];
-    int index = 0;
-
-    for (int i = 0; i < sortColumnIndexes.length; i++) {
-      Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]);
-      if (sortColumn == null) {
-        throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]);
-      }
-      sortSpecs[index++] = new SortSpec(sortColumn, true, true);
-    }
-    sortNode.setSortSpecs(sortSpecs);
-
-    sortNode.setChild(insertNode.getChild());
-    insertNode.setChild(sortNode);
-    plan.getRootBlock().registerNode(sortNode);
-
-    return plan;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
deleted file mode 100644
index f80bd5e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-public class ColumnMapping {
-  private TableMeta tableMeta;
-  private Schema schema;
-  private char rowKeyDelimiter;
-
-  private String hbaseTableName;
-
-  private int[] rowKeyFieldIndexes;
-  private boolean[] isRowKeyMappings;
-  private boolean[] isBinaryColumns;
-  private boolean[] isColumnKeys;
-  private boolean[] isColumnValues;
-
-  // schema order -> 0: cf name, 1: column name -> name bytes
-  private byte[][][] mappingColumns;
-
-  private int numRowKeys;
-
-  public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException {
-    this.schema = schema;
-    this.tableMeta = tableMeta;
-
-    init();
-  }
-
-  private void init() throws IOException {
-    hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY);
-    String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim();
-    if (delim.length() > 0) {
-      rowKeyDelimiter = delim.charAt(0);
-    }
-    isRowKeyMappings = new boolean[schema.size()];
-    rowKeyFieldIndexes = new int[schema.size()];
-    isBinaryColumns = new boolean[schema.size()];
-    isColumnKeys = new boolean[schema.size()];
-    isColumnValues = new boolean[schema.size()];
-
-    mappingColumns = new byte[schema.size()][][];
-
-    for (int i = 0; i < schema.size(); i++) {
-      rowKeyFieldIndexes[i] = -1;
-    }
-
-    String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, "");
-    if (columnMapping == null || columnMapping.isEmpty()) {
-      throw new IOException("'columns' property is required.");
-    }
-
-    String[] columnMappingTokens = columnMapping.split(",");
-
-    if (columnMappingTokens.length != schema.getColumns().size()) {
-      throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns");
-    }
-
-    int index = 0;
-    for (String eachToken: columnMappingTokens) {
-      mappingColumns[index] = new byte[2][];
-
-      byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':');
-
-      if (mappingTokens.length == 3) {
-        if (mappingTokens[0].length == 0) {
-          // cfname
-          throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
-              "or '<cfname>:value:' or '<cfname>:value:#b'");
-        }
-        //<cfname>:key: or <cfname>:value:
-        if (mappingTokens[2].length != 0) {
-          String binaryOption = new String(mappingTokens[2]);
-          if ("#b".equals(binaryOption)) {
-            isBinaryColumns[index] = true;
-          } else {
-            throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " +
-                "or '<cfname>:value:' or '<cfname>:value:#b'");
-          }
-        }
-        mappingColumns[index][0] = mappingTokens[0];
-        String keyOrValue = new String(mappingTokens[1]);
-        if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
-          isColumnKeys[index] = true;
-        } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) {
-          isColumnValues[index] = true;
-        } else {
-          throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
-        }
-      } else if (mappingTokens.length == 2) {
-        //<cfname>: or <cfname>:<qualifier> or :key
-        String cfName = new String(mappingTokens[0]);
-        String columnName = new String(mappingTokens[1]);
-        RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName);
-        if (rowKeyMapping != null) {
-          isRowKeyMappings[index] = true;
-          numRowKeys++;
-          isBinaryColumns[index] = rowKeyMapping.isBinary();
-          if (!cfName.isEmpty()) {
-            if (rowKeyDelimiter == 0) {
-              throw new IOException("hbase.rowkey.delimiter is required.");
-            }
-            rowKeyFieldIndexes[index] = Integer.parseInt(cfName);
-          } else {
-            rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column.
-          }
-        } else {
-          if (cfName.isEmpty()) {
-            throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'");
-          }
-          if (cfName != null) {
-            mappingColumns[index][0] = Bytes.toBytes(cfName);
-          }
-
-          if (columnName != null && !columnName.isEmpty()) {
-            String[] columnNameTokens = columnName.split("#");
-            if (columnNameTokens[0].isEmpty()) {
-              mappingColumns[index][1] = null;
-            } else {
-              mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]);
-            }
-            if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) {
-              isBinaryColumns[index] = true;
-            }
-          }
-        }
-      } else {
-        throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'");
-      }
-
-      index++;
-    } // for loop
-  }
-
-  public List<String> getColumnFamilyNames() {
-    List<String> cfNames = new ArrayList<String>();
-
-    for (byte[][] eachCfName: mappingColumns) {
-      if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) {
-        String cfName = new String(eachCfName[0]);
-        if (!cfNames.contains(cfName)) {
-          cfNames.add(cfName);
-        }
-      }
-    }
-
-    return cfNames;
-  }
-
-  private RowKeyMapping getRowKeyMapping(String cfName, String columnName) {
-    if (columnName == null || columnName.isEmpty()) {
-      return null;
-    }
-
-    String[] tokens = columnName.split("#");
-    if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) {
-      return null;
-    }
-
-    RowKeyMapping rowKeyMapping = new RowKeyMapping();
-
-    if (tokens.length == 2 && "b".equals(tokens[1])) {
-      rowKeyMapping.setBinary(true);
-    }
-
-    if (cfName != null && !cfName.isEmpty()) {
-      rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName));
-    }
-    return rowKeyMapping;
-  }
-
-  public char getRowKeyDelimiter() {
-    return rowKeyDelimiter;
-  }
-
-  public int[] getRowKeyFieldIndexes() {
-    return rowKeyFieldIndexes;
-  }
-
-  public boolean[] getIsRowKeyMappings() {
-    return isRowKeyMappings;
-  }
-
-  public byte[][][] getMappingColumns() {
-    return mappingColumns;
-  }
-
-  public Schema getSchema() {
-    return schema;
-  }
-
-  public boolean[] getIsBinaryColumns() {
-    return isBinaryColumns;
-  }
-
-  public String getHbaseTableName() {
-    return hbaseTableName;
-  }
-
-  public boolean[] getIsColumnKeys() {
-    return isColumnKeys;
-  }
-
-  public int getNumRowKeys() {
-    return numRowKeys;
-  }
-
-  public boolean[] getIsColumnValues() {
-    return isColumnValues;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
deleted file mode 100644
index c05c5bb..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.util.Bytes;
-
-import java.io.IOException;
-
-public class HBaseBinarySerializerDeserializer {
-
-  public static Datum deserialize(Column col, byte[] bytes) throws IOException {
-    Datum datum;
-    switch (col.getDataType().getType()) {
-      case INT1:
-      case INT2:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes));
-        break;
-      case INT4:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes));
-        break;
-      case INT8:
-        if (bytes.length == 4) {
-          datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes));
-        } else {
-          datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes));
-        }
-        break;
-      case FLOAT4:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes));
-        break;
-      case FLOAT8:
-        datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes));
-        break;
-      case TEXT:
-        datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes);
-        break;
-      default:
-        datum = NullDatum.get();
-        break;
-    }
-    return datum;
-  }
-
-  public static byte[] serialize(Column col, Datum datum) throws IOException {
-    if (datum == null || datum instanceof NullDatum) {
-      return null;
-    }
-
-    byte[] bytes;
-    switch (col.getDataType().getType()) {
-      case INT1:
-      case INT2:
-        bytes = Bytes.toBytes(datum.asInt2());
-        break;
-      case INT4:
-        bytes = Bytes.toBytes(datum.asInt4());
-        break;
-      case INT8:
-        bytes = Bytes.toBytes(datum.asInt8());
-        break;
-      case FLOAT4:
-        bytes = Bytes.toBytes(datum.asFloat4());
-        break;
-      case FLOAT8:
-        bytes = Bytes.toBytes(datum.asFloat8());
-        break;
-      case TEXT:
-        bytes = Bytes.toBytes(datum.asChars());
-        break;
-      default:
-        bytes = null;
-        break;
-    }
-
-    return bytes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
deleted file mode 100644
index 43ad7f3..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import com.google.common.base.Objects;
-import com.google.gson.annotations.Expose;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.storage.fragment.StorageFragmentProtos.HBaseFragmentProto;
-
-public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable {
-  @Expose
-  private String tableName;
-  @Expose
-  private String hbaseTableName;
-  @Expose
-  private byte[] startRow;
-  @Expose
-  private byte[] stopRow;
-  @Expose
-  private String regionLocation;
-  @Expose
-  private boolean last;
-  @Expose
-  private long length;
-
-  public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) {
-    this.tableName = tableName;
-    this.hbaseTableName = hbaseTableName;
-    this.startRow = startRow;
-    this.stopRow = stopRow;
-    this.regionLocation = regionLocation;
-    this.last = false;
-  }
-
-  public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException {
-    HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
-    builder.mergeFrom(raw);
-    builder.build();
-    init(builder.build());
-  }
-
-  private void init(HBaseFragmentProto proto) {
-    this.tableName = proto.getTableName();
-    this.hbaseTableName = proto.getHbaseTableName();
-    this.startRow = proto.getStartRow().toByteArray();
-    this.stopRow = proto.getStopRow().toByteArray();
-    this.regionLocation = proto.getRegionLocation();
-    this.length = proto.getLength();
-    this.last = proto.getLast();
-  }
-
-  @Override
-  public int compareTo(HBaseFragment t) {
-    return Bytes.compareTo(startRow, t.startRow);
-  }
-
-  @Override
-  public String getTableName() {
-    return tableName;
-  }
-
-  @Override
-  public String getKey() {
-    return new String(startRow);
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return startRow == null || stopRow == null;
-  }
-
-  @Override
-  public long getLength() {
-    return length;
-  }
-
-  public void setLength(long length) {
-    this.length = length;
-  }
-
-  @Override
-  public String[] getHosts() {
-    return new String[] {regionLocation};
-  }
-
-  public Object clone() throws CloneNotSupportedException {
-    HBaseFragment frag = (HBaseFragment) super.clone();
-    frag.tableName = tableName;
-    frag.hbaseTableName = hbaseTableName;
-    frag.startRow = startRow;
-    frag.stopRow = stopRow;
-    frag.regionLocation = regionLocation;
-    frag.last = last;
-    frag.length = length;
-    return frag;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (o instanceof HBaseFragment) {
-      HBaseFragment t = (HBaseFragment) o;
-      if (tableName.equals(t.tableName)
-          && Bytes.equals(startRow, t.startRow)
-          && Bytes.equals(stopRow, t.stopRow)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow);
-  }
-
-  @Override
-  public String toString() {
-    return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" +
-        ", \"startRow\": \"" + new String(startRow) + "\"" +
-        ", \"stopRow\": \"" + new String(stopRow) + "\"" +
-        ", \"length\": \"" + length + "\"}" ;
-  }
-
-  @Override
-  public FragmentProto getProto() {
-    HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder();
-    builder.setTableName(tableName)
-        .setHbaseTableName(hbaseTableName)
-        .setStartRow(ByteString.copyFrom(startRow))
-        .setStopRow(ByteString.copyFrom(stopRow))
-        .setLast(last)
-        .setLength(length)
-        .setRegionLocation(regionLocation);
-
-    FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder();
-    fragmentBuilder.setId(this.tableName);
-    fragmentBuilder.setContents(builder.buildPartial().toByteString());
-    fragmentBuilder.setStoreType(StoreType.HBASE.name());
-    return fragmentBuilder.build();
-  }
-
-  public byte[] getStartRow() {
-    return startRow;
-  }
-
-  public byte[] getStopRow() {
-    return stopRow;
-  }
-
-  public String getRegionLocation() {
-    return regionLocation;
-  }
-
-  public boolean isLast() {
-    return last;
-  }
-
-  public void setLast(boolean last) {
-    this.last = last;
-  }
-
-  public String getHbaseTableName() {
-    return hbaseTableName;
-  }
-
-  public void setHbaseTableName(String hbaseTableName) {
-    this.hbaseTableName = hbaseTableName;
-  }
-
-  public void setStartRow(byte[] startRow) {
-    this.startRow = startRow;
-  }
-
-  public void setStopRow(byte[] stopRow) {
-    this.stopRow = stopRow;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
deleted file mode 100644
index 50f61a8..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.hbase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-
-public class HBasePutAppender extends AbstractHBaseAppender {
-  private HTableInterface htable;
-  private long totalNumBytes;
-
-  public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId,
-                          Schema schema, TableMeta meta, Path stagingDir) {
-    super(conf, taskAttemptId, schema, meta, stagingDir);
-  }
-
-  @Override
-  public void init() throws IOException {
-    super.init();
-
-    Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta);
-    HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, StoreType.HBASE))
-        .getConnection(hbaseConf);
-    htable = hconn.getTable(columnMapping.getHbaseTableName());
-    htable.setAutoFlushTo(false);
-    htable.setWriteBufferSize(5 * 1024 * 1024);
-  }
-
-  @Override
-  public void addTuple(Tuple tuple) throws IOException {
-    byte[] rowkey = getRowKeyBytes(tuple);
-    totalNumBytes += rowkey.length;
-    Put put = new Put(rowkey);
-    readKeyValues(tuple, rowkey);
-
-    for (int i = 0; i < columnNum; i++) {
-      if (isRowKeyMappings[i]) {
-        continue;
-      }
-      Datum datum = tuple.get(i);
-      byte[] value;
-      if (isBinaryColumns[i]) {
-        value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum);
-      } else {
-        value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum);
-      }
-
-      if (isColumnKeys[i]) {
-        columnKeyDatas[columnKeyValueDataIndexes[i]] = value;
-      } else if (isColumnValues[i]) {
-        columnValueDatas[columnKeyValueDataIndexes[i]] = value;
-      } else {
-        put.add(mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value);
-        totalNumBytes += value.length;
-      }
-    }
-
-    for (int i = 0; i < columnKeyDatas.length; i++) {
-     put.add(columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]);
-      totalNumBytes += columnKeyDatas[i].length + columnValueDatas[i].length;
-    }
-
-    htable.put(put);
-
-    if (enabledStats) {
-      stats.incrementRow();
-      stats.setNumBytes(totalNumBytes);
-    }
-  }
-
-  @Override
-  public void flush() throws IOException {
-    htable.flushCommits();
-  }
-
-  @Override
-  public long getEstimatedOutputSize() throws IOException {
-    return 0;
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (htable != null) {
-      htable.flushCommits();
-      htable.close();
-    }
-    if (enabledStats) {
-      stats.setNumBytes(totalNumBytes);
-    }
-  }
-}