You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/05 09:21:28 UTC

[24/29] tajo git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into hbase_storage

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index bc3be04,07b47c1..b68eb2e
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@@ -438,28 -427,356 +434,28 @@@ public class Query implements EventHand
        return finalState;
      }
  
-     private boolean finalizeQuery(Query query, QueryCompletedEvent event) {
+     private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
 -      MasterPlan masterPlan = query.getPlan();
 +      SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId());
 +      StoreType storeType = lastStage.getTableMeta().getStoreType();
 +      try {
 +        LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
 +        CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
 +        TableDesc tableDesc =  PlannerUtil.getTableDesc(catalog, rootNode.getChild());
  
 -      ExecutionBlock terminal = query.getPlan().getTerminalBlock();
 -      DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
 +        Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
 +            .commitOutputData(query.context.getQueryContext(),
 +                lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
  
 -      QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
 -      try {
 -        Path finalOutputDir = commitOutputData(query);
 +        QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
-           hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
-         return true;
+         hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
 -      } catch (Throwable t) {
 -        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(t)));
 +      } catch (Exception e) {
-         LOG.error(e.getMessage(), e);
 +        query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
-         return false;
+         return QueryState.QUERY_ERROR;
        }
 -
++      
+       return QueryState.QUERY_SUCCEEDED;
      }
  
 -    /**
 -     * It moves a result data stored in a staging output dir into a final output dir.
 -     */
 -    public Path commitOutputData(Query query) throws IOException {
 -      QueryContext queryContext = query.context.getQueryContext();
 -      Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
 -      Path finalOutputDir;
 -      if (queryContext.hasOutputPath()) {
 -        finalOutputDir = queryContext.getOutputPath();
 -        try {
 -          FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
 -
 -          if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO
 -
 -            // It moves the original table into the temporary location.
 -            // Then it moves the new result table into the original table location.
 -            // Upon failed, it recovers the original table if possible.
 -            boolean movedToOldTable = false;
 -            boolean committed = false;
 -            Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
 -
 -            if (queryContext.hasPartition()) {
 -              // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
 -              // renaming directory.
 -              Map<Path, Path> renameDirs = TUtil.newHashMap();
 -              // This is a map for recovering existing partition directory. A key is current directory and a value is
 -              // temporary directory to back up.
 -              Map<Path, Path> recoveryDirs = TUtil.newHashMap();
 -
 -              try {
 -                if (!fs.exists(finalOutputDir)) {
 -                  fs.mkdirs(finalOutputDir);
 -                }
 -
 -                visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
 -                    renameDirs, oldTableDir);
 -
 -                // Rename target partition directories
 -                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
 -                  // Backup existing data files for recovering
 -                  if (fs.exists(entry.getValue())) {
 -                    String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
 -                        oldTableDir.toString());
 -                    Path recoveryPath = new Path(recoveryPathString);
 -                    fs.rename(entry.getValue(), recoveryPath);
 -                    fs.exists(recoveryPath);
 -                    recoveryDirs.put(entry.getValue(), recoveryPath);
 -                  }
 -                  // Delete existing directory
 -                  fs.delete(entry.getValue(), true);
 -                  // Rename staging directory to final output directory
 -                  fs.rename(entry.getKey(), entry.getValue());
 -                }
 -
 -              } catch (IOException ioe) {
 -                // Remove created dirs
 -                for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
 -                  fs.delete(entry.getValue(), true);
 -                }
 -
 -                // Recovery renamed dirs
 -                for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
 -                  fs.delete(entry.getValue(), true);
 -                  fs.rename(entry.getValue(), entry.getKey());
 -                }
 -
 -                throw new IOException(ioe.getMessage());
 -              }
 -            } else { // no partition
 -              try {
 -
 -                // if the final output dir exists, move all contents to the temporary table dir.
 -                // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
 -                if (fs.exists(finalOutputDir)) {
 -                  fs.mkdirs(oldTableDir);
 -
 -                  for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
 -                    fs.rename(status.getPath(), oldTableDir);
 -                  }
 -
 -                  movedToOldTable = fs.exists(oldTableDir);
 -                } else { // if the parent does not exist, make its parent directory.
 -                  fs.mkdirs(finalOutputDir);
 -                }
 -
 -                // Move the results to the final output dir.
 -                for (FileStatus status : fs.listStatus(stagingResultDir)) {
 -                  fs.rename(status.getPath(), finalOutputDir);
 -                }
 -
 -                // Check the final output dir
 -                committed = fs.exists(finalOutputDir);
 -
 -              } catch (IOException ioe) {
 -                // recover the old table
 -                if (movedToOldTable && !committed) {
 -
 -                  // if commit is failed, recover the old data
 -                  for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
 -                    fs.delete(status.getPath(), true);
 -                  }
 -
 -                  for (FileStatus status : fs.listStatus(oldTableDir)) {
 -                    fs.rename(status.getPath(), finalOutputDir);
 -                  }
 -                }
 -
 -                throw new IOException(ioe.getMessage());
 -              }
 -            }
 -          } else {
 -            NodeType queryType = queryContext.getCommandType();
 -
 -            if (queryType == NodeType.INSERT) { // INSERT INTO an existing table
 -
 -              NumberFormat fmt = NumberFormat.getInstance();
 -              fmt.setGroupingUsed(false);
 -              fmt.setMinimumIntegerDigits(3);
 -
 -              if (queryContext.hasPartition()) {
 -                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
 -                  if (eachFile.isFile()) {
 -                    LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
 -                    continue;
 -                  }
 -                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1);
 -                }
 -              } else {
 -                int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
 -                for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
 -                  moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++);
 -                }
 -              }
 -              // checking all file moved and remove empty dir
 -              verifyAllFileMoved(fs, stagingResultDir);
 -              FileStatus[] files = fs.listStatus(stagingResultDir);
 -              if (files != null && files.length != 0) {
 -                for (FileStatus eachFile: files) {
 -                  LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
 -                }
 -              }
 -            } else { // CREATE TABLE AS SELECT (CTAS)
 -              if (fs.exists(finalOutputDir)) {
 -                for (FileStatus status : fs.listStatus(stagingResultDir)) {
 -                  fs.rename(status.getPath(), finalOutputDir);
 -                }
 -              } else {
 -                fs.rename(stagingResultDir, finalOutputDir);
 -              }
 -              LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
 -            }
 -          }
 -
 -          // remove the staging directory if the final output dir is given.
 -          Path stagingDirRoot = queryContext.getStagingDir().getParent();
 -          fs.delete(stagingDirRoot, true);
 -
 -        } catch (Throwable t) {
 -          LOG.error(t);
 -          throw new IOException(t);
 -        }
 -      } else {
 -        finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
 -      }
 -
 -      return finalOutputDir;
 -    }
 -
 -    /**
 -     * This method sets a rename map which includes renamed staging directory to final output directory recursively.
 -     * If there exists some data files, this delete it for duplicate data.
 -     *
 -     *
 -     * @param fs
 -     * @param stagingPath
 -     * @param outputPath
 -     * @param stagingParentPathString
 -     * @throws IOException
 -     */
 -    private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
 -                                        String stagingParentPathString,
 -                                        Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
 -      FileStatus[] files = fs.listStatus(stagingPath);
 -
 -      for(FileStatus eachFile : files) {
 -        if (eachFile.isDirectory()) {
 -          Path oldPath = eachFile.getPath();
 -
 -          // Make recover directory.
 -          String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
 -          oldTableDir.toString());
 -          Path recoveryPath = new Path(recoverPathString);
 -          if (!fs.exists(recoveryPath)) {
 -            fs.mkdirs(recoveryPath);
 -          }
 -
 -          visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
 -          renameDirs, oldTableDir);
 -          // Find last order partition for renaming
 -          String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
 -          outputPath.toString());
 -          Path newPath = new Path(newPathString);
 -          if (!isLeafDirectory(fs, eachFile.getPath())) {
 -           renameDirs.put(eachFile.getPath(), newPath);
 -          } else {
 -            if (!fs.exists(newPath)) {
 -             fs.mkdirs(newPath);
 -            }
 -          }
 -        }
 -      }
 -    }
 -
 -    private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
 -      boolean retValue = false;
 -
 -      FileStatus[] files = fs.listStatus(path);
 -      for (FileStatus file : files) {
 -        if (fs.isDirectory(file.getPath())) {
 -          retValue = true;
 -          break;
 -        }
 -      }
 -
 -      return retValue;
 -    }
 -
 -    private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
 -      FileStatus[] files = fs.listStatus(stagingPath);
 -      if (files != null && files.length != 0) {
 -        for (FileStatus eachFile: files) {
 -          if (eachFile.isFile()) {
 -            LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
 -            return false;
 -          } else {
 -            if (verifyAllFileMoved(fs, eachFile.getPath())) {
 -              fs.delete(eachFile.getPath(), false);
 -            } else {
 -              return false;
 -            }
 -          }
 -        }
 -      }
 -
 -      return true;
 -    }
 -
 -    /**
 -     * Attach the sequence number to a path.
 -     *
 -     * @param path Path
 -     * @param seq sequence number
 -     * @param nf Number format
 -     * @return New path attached with sequence number
 -     * @throws IOException
 -     */
 -    private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
 -      String[] tokens = path.getName().split("-");
 -      if (tokens.length != 4) {
 -        throw new IOException("Wrong result file name:" + path);
 -      }
 -      return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
 -    }
 -
 -    /**
 -     * Attach the sequence number to the output file name and than move the file into the final result path.
 -     *
 -     * @param fs FileSystem
 -     * @param stagingResultDir The staging result dir
 -     * @param fileStatus The file status
 -     * @param finalOutputPath Final output path
 -     * @param nf Number format
 -     * @param fileSeq The sequence number
 -     * @throws IOException
 -     */
 -    private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
 -                                            FileStatus fileStatus, Path finalOutputPath,
 -                                            NumberFormat nf,
 -                                            int fileSeq) throws IOException {
 -      if (fileStatus.isDirectory()) {
 -        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
 -        if (subPath != null) {
 -          Path finalSubPath = new Path(finalOutputPath, subPath);
 -          if (!fs.exists(finalSubPath)) {
 -            fs.mkdirs(finalSubPath);
 -          }
 -          int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
 -          for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
 -            moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq);
 -          }
 -        } else {
 -          throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
 -        }
 -      } else {
 -        String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
 -        if (subPath != null) {
 -          Path finalSubPath = new Path(finalOutputPath, subPath);
 -          finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
 -          if (!fs.exists(finalSubPath.getParent())) {
 -            fs.mkdirs(finalSubPath.getParent());
 -          }
 -          if (fs.exists(finalSubPath)) {
 -            throw new IOException("Already exists data file:" + finalSubPath);
 -          }
 -          boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
 -          if (success) {
 -            LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
 -                "to final output[" + finalSubPath + "]");
 -          } else {
 -            LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
 -                "to final output[" + finalSubPath + "]");
 -          }
 -        }
 -      }
 -    }
 -
 -    private String extractSubPath(Path parentPath, Path childPath) {
 -      String parentPathStr = parentPath.toUri().getPath();
 -      String childPathStr = childPath.toUri().getPath();
 -
 -      if (parentPathStr.length() > childPathStr.length()) {
 -        return null;
 -      }
 -
 -      int index = childPathStr.indexOf(parentPathStr);
 -      if (index != 0) {
 -        return null;
 -      }
 -
 -      return childPathStr.substring(parentPathStr.length() + 1);
 -    }
 -
      private static interface QueryHook {
        boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
        void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
index 00b95ac,00b95ac..d4e0752
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java
@@@ -24,7 -24,7 +24,7 @@@ import org.apache.tajo.QueryId
  import org.apache.tajo.TajoProtos;
  import org.apache.tajo.engine.json.CoreGsonHelper;
  import org.apache.tajo.ipc.ClientProtos.QueryInfoProto;
--import org.apache.tajo.json.GsonObject;
++import org.apache.tajo.storage.json.GsonObject;
  import org.apache.tajo.util.TajoIdUtils;
  import org.apache.tajo.util.history.History;
  

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 6b61d04,75d8ab6..1eaef0f
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@@ -62,7 -58,7 +62,8 @@@ import org.apache.tajo.rpc.CallFuture
  import org.apache.tajo.rpc.NettyClientBase;
  import org.apache.tajo.rpc.RpcConnectionPool;
  import org.apache.tajo.storage.StorageManager;
 +import org.apache.tajo.storage.StorageProperty;
+ import org.apache.tajo.storage.StorageUtil;
  import org.apache.tajo.util.HAServiceUtil;
  import org.apache.tajo.util.metrics.TajoMetrics;
  import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
@@@ -476,7 -441,14 +479,20 @@@ public class QueryMasterTask extends Co
      // Create Output Directory
      ////////////////////////////////////////////
  
-     stagingDir = new Path(TajoConf.getStagingDir(conf), queryId);
++    String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
+     if (context.isCreateTable() || context.isInsert()) {
 -      stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
++      if (outputPath == null || outputPath.isEmpty()) {
++        // hbase
++        stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
++      } else {
++        stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
++      }
+     } else {
+       stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+     }
+ 
+     // initializ
+     fs = stagingDir.getFileSystem(conf);
  
      if (fs.exists(stagingDir)) {
        throw new IOException("The staging directory '" + stagingDir + "' already exists");

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 745456a,39bb7ed..7f05fa4
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@@ -57,7 -57,8 +57,9 @@@ import org.apache.tajo.master.TaskRunne
  import org.apache.tajo.master.event.*;
  import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
  import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+ import org.apache.tajo.master.container.TajoContainer;
+ import org.apache.tajo.master.container.TajoContainerId;
 +import org.apache.tajo.storage.FileStorageManager;
  import org.apache.tajo.plan.util.PlannerUtil;
  import org.apache.tajo.plan.logical.*;
  import org.apache.tajo.storage.StorageManager;

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
index 7a81b4b,7a81b4b..aaf5754
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java
@@@ -22,7 -22,7 +22,7 @@@ import com.google.gson.annotations.Expo
  import org.apache.tajo.engine.json.CoreGsonHelper;
  import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto;
  import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto;
--import org.apache.tajo.json.GsonObject;
++import org.apache.tajo.storage.json.GsonObject;
  import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
  
  import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java
index 556a971,556a971..126e3fe
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryUnitHistory.java
@@@ -20,7 -20,7 +20,7 @@@ package org.apache.tajo.util.history
  
  import com.google.gson.annotations.Expose;
  import org.apache.tajo.engine.json.CoreGsonHelper;
--import org.apache.tajo.json.GsonObject;
++import org.apache.tajo.storage.json.GsonObject;
  
  public class QueryUnitHistory implements GsonObject {
    @Expose private String id;

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
----------------------------------------------------------------------
diff --cc tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
index b3ac4d2,b3ac4d2..17b9ec7
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java
@@@ -22,7 -22,7 +22,7 @@@ import com.google.gson.annotations.Expo
  import com.google.gson.reflect.TypeToken;
  import org.apache.tajo.engine.json.CoreGsonHelper;
  import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto;
--import org.apache.tajo.json.GsonObject;
++import org.apache.tajo.storage.json.GsonObject;
  
  import java.util.ArrayList;
  import java.util.List;

http://git-wip-us.apache.org/repos/asf/tajo/blob/940546a0/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
index cb9aa74,0000000..db8eb84
mode 100644,000000..100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestHBaseTable.java
@@@ -1,1474 -1,0 +1,1469 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.tajo.engine.query;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.fs.FileStatus;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hbase.*;
 +import org.apache.hadoop.hbase.client.*;
 +import org.apache.hadoop.hbase.filter.Filter;
 +import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
 +import org.apache.tajo.IntegrationTest;
 +import org.apache.tajo.QueryTestCaseBase;
 +import org.apache.tajo.TajoTestingCluster;
 +import org.apache.tajo.catalog.Schema;
 +import org.apache.tajo.catalog.TableDesc;
 +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 +import org.apache.tajo.common.TajoDataTypes.Type;
 +import org.apache.tajo.datum.TextDatum;
 +import org.apache.tajo.plan.expr.*;
 +import org.apache.tajo.plan.logical.ScanNode;
 +import org.apache.tajo.storage.StorageConstants;
 +import org.apache.tajo.storage.StorageManager;
 +import org.apache.tajo.storage.fragment.Fragment;
 +import org.apache.tajo.storage.hbase.*;
 +import org.apache.tajo.util.Bytes;
 +import org.apache.tajo.util.KeyValueSet;
 +import org.apache.tajo.util.TUtil;
 +import org.junit.AfterClass;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.experimental.categories.Category;
 +
 +import java.net.InetAddress;
 +import java.sql.ResultSet;
 +import java.text.DecimalFormat;
 +import java.util.*;
 +
 +import static org.junit.Assert.*;
 +import static org.junit.Assert.assertEquals;
 +
 +@Category(IntegrationTest.class)
 +public class TestHBaseTable extends QueryTestCaseBase {
 +  private static final Log LOG = LogFactory.getLog(TestHBaseTable.class);
 +
 +  @BeforeClass
 +  public static void beforeClass() {
 +    try {
 +      testingCluster.getHBaseUtil().startHBaseCluster();
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +    }
 +  }
 +
 +  @AfterClass
 +  public static void afterClass() {
 +    try {
 +      testingCluster.getHBaseUtil().stopHBaseCluster();
 +    } catch (Exception e) {
 +      e.printStackTrace();
 +    }
 +  }
 +
 +  @Test
 +  public void testVerifyCreateHBaseTableRequiredMeta() throws Exception {
 +    try {
 +      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
 +          "USING hbase").close();
 +
 +      fail("hbase table must have 'table' meta");
 +    } catch (Exception e) {
 +      assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0);
 +    }
 +
 +    try {
 +      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
 +          "USING hbase " +
 +          "WITH ('table'='hbase_table')").close();
 +
 +      fail("hbase table must have 'columns' meta");
 +    } catch (Exception e) {
 +      assertTrue(e.getMessage().indexOf("'columns' property is required") >= 0);
 +    }
 +
 +    try {
 +      executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text) " +
 +          "USING hbase " +
 +          "WITH ('table'='hbase_table', 'columns'='col1:,col2:')").close();
 +
 +      fail("hbase table must have 'hbase.zookeeper.quorum' meta");
 +    } catch (Exception e) {
 +      assertTrue(e.getMessage().indexOf("HBase mapped table") >= 0);
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateHBaseTable() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table1");
 +
 +    HTableDescriptor hTableDesc = testingCluster.getHBaseUtil().getTableDescriptor("hbase_table");
 +    assertNotNull(hTableDesc);
 +    assertEquals("hbase_table", hTableDesc.getNameAsString());
 +
 +    HColumnDescriptor[] hColumns = hTableDesc.getColumnFamilies();
 +    // col1 is mapped to rowkey
 +    assertEquals(2, hColumns.length);
 +    assertEquals("col2", hColumns[0].getNameAsString());
 +    assertEquals("col3", hColumns[1].getNameAsString());
 +
 +    executeString("DROP TABLE hbase_mapped_table1 PURGE").close();
 +
 +    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
 +    try {
 +      assertFalse(hAdmin.tableExists("hbase_table"));
 +    } finally {
 +      hAdmin.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateNotExistsExternalHBaseTable() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    try {
 +      executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table1 (col1 text, col2 text, col3 text, col4 text) " +
 +          "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:a,col3:,col2:b', " +
 +          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +      fail("External table should be a existed table.");
 +    } catch (Exception e) {
 +      assertTrue(e.getMessage().indexOf("External table should be a existed table.") >= 0);
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateRowFieldWithNonText() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    try {
 +      executeString("CREATE TABLE hbase_mapped_table2 (rk1 int4, rk2 text, col3 text, col4 text) " +
 +          "USING hbase WITH ('table'='hbase_table', 'columns'='0:key#b,1:key,col3:,col2:b', " +
 +          "'hbase.rowkey.delimiter'='_', " +
 +          "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +          "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +      fail("Key field type should be TEXT type");
 +    } catch (Exception e) {
 +      assertTrue(e.getMessage().indexOf("Key field type should be TEXT type") >= 0);
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateExternalHBaseTable() throws Exception {
 +    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table_not_purge"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col1"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col2"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col3"));
 +    testingCluster.getHBaseUtil().createTable(hTableDesc);
 +
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
 +        "USING hbase WITH ('table'='external_hbase_table_not_purge', 'columns'=':key,col1:a,col2:,col3:b', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("external_hbase_mapped_table");
 +
 +    executeString("DROP TABLE external_hbase_mapped_table").close();
 +
 +    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
 +    try {
 +      assertTrue(hAdmin.tableExists("external_hbase_table_not_purge"));
 +      hAdmin.disableTable("external_hbase_table_not_purge");
 +      hAdmin.deleteTable("external_hbase_table_not_purge");
 +    } finally {
 +      hAdmin.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testSimpleSelectQuery() throws Exception {
 +    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col1"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col2"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col3"));
 +    testingCluster.getHBaseUtil().createTable(hTableDesc);
 +
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
 +        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("external_hbase_mapped_table");
 +
 +    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
 +        .getConnection(testingCluster.getHBaseUtil().getConf());
 +    HTableInterface htable = hconn.getTable("external_hbase_table");
 +
 +    try {
 +      for (int i = 0; i < 100; i++) {
 +        Put put = new Put(String.valueOf(i).getBytes());
 +        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
 +        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
 +        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
 +        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        htable.put(put);
 +      }
 +
 +      ResultSet res = executeString("select * from external_hbase_mapped_table where rk > '20'");
 +      assertResultSet(res);
 +      cleanupQuery(res);
 +    } finally {
 +      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
 +      htable.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testBinaryMappedQuery() throws Exception {
 +    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col1"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col2"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col3"));
 +    testingCluster.getHBaseUtil().createTable(hTableDesc);
 +
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk int8, col1 text, col2 text, col3 int4)\n " +
 +        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key#b,col1:a,col2:,col3:b#b', \n" +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "', \n" +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("external_hbase_mapped_table");
 +
 +    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
 +        .getConnection(testingCluster.getHBaseUtil().getConf());
 +    HTableInterface htable = hconn.getTable("external_hbase_table");
 +
 +    try {
 +      for (int i = 0; i < 100; i++) {
 +        Put put = new Put(Bytes.toBytes((long) i));
 +        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
 +        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
 +        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
 +        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes(i));
 +        htable.put(put);
 +      }
 +
 +      ResultSet res = executeString("select * from external_hbase_mapped_table where rk > 20");
 +      assertResultSet(res);
 +      res.close();
 +
 +      //Projection
 +      res = executeString("select col3, col2, rk from external_hbase_mapped_table where rk > 95");
 +
 +      String expected = "col3,col2,rk\n" +
 +          "-------------------------------\n" +
 +          "96,{\"k1\":\"k1-96\", \"k2\":\"k2-96\"},96\n" +
 +          "97,{\"k1\":\"k1-97\", \"k2\":\"k2-97\"},97\n" +
 +          "98,{\"k1\":\"k1-98\", \"k2\":\"k2-98\"},98\n" +
 +          "99,{\"k1\":\"k1-99\", \"k2\":\"k2-99\"},99\n";
 +
 +      assertEquals(expected, resultSetToString(res));
 +      res.close();
 +
 +    } finally {
 +      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
 +      htable.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testColumnKeyValueSelectQuery() throws Exception {
 +    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col2"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col3"));
 +    testingCluster.getHBaseUtil().createTable(hTableDesc);
 +
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, col2_key text, col2_value text, col3 text) " +
 +        "USING hbase WITH ('table'='external_hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
 +        "'hbase.rowkey.delimiter'='_', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("external_hbase_mapped_table");
 +
 +    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
 +        .getConnection(testingCluster.getHBaseUtil().getConf());
 +    HTableInterface htable = hconn.getTable("external_hbase_table");
 +
 +    try {
 +      for (int i = 0; i < 10; i++) {
 +        Put put = new Put(Bytes.toBytes("rk-" + i));
 +        for (int j = 0; j < 5; j++) {
 +          put.add("col2".getBytes(), ("key-" + j).getBytes(), Bytes.toBytes("value-" + j));
 +        }
 +        put.add("col3".getBytes(), "".getBytes(), ("col3-value-" + i).getBytes());
 +        htable.put(put);
 +      }
 +
 +      ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 >= 'rk-0'");
 +      assertResultSet(res);
 +      cleanupQuery(res);
 +    } finally {
 +      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
 +      htable.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testRowFieldSelectQuery() throws Exception {
 +    HTableDescriptor hTableDesc = new HTableDescriptor(TableName.valueOf("external_hbase_table"));
 +    hTableDesc.addFamily(new HColumnDescriptor("col3"));
 +    testingCluster.getHBaseUtil().createTable(hTableDesc);
 +
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE EXTERNAL TABLE external_hbase_mapped_table (rk1 text, rk2 text, col3 text) " +
 +        "USING hbase WITH ('table'='external_hbase_table', 'columns'='0:key,1:key,col3:a', " +
 +        "'hbase.rowkey.delimiter'='_', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("external_hbase_mapped_table");
 +
 +    HConnection hconn = ((HBaseStorageManager)StorageManager.getStorageManager(conf, StoreType.HBASE))
 +        .getConnection(testingCluster.getHBaseUtil().getConf());
 +    HTableInterface htable = hconn.getTable("external_hbase_table");
 +
 +    try {
 +      for (int i = 0; i < 100; i++) {
 +        Put put = new Put(("field1-" + i + "_field2-" + i).getBytes());
 +        put.add("col3".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
 +        htable.put(put);
 +      }
 +
 +      ResultSet res = executeString("select * from external_hbase_mapped_table where rk1 > 'field1-20'");
 +      assertResultSet(res);
 +      cleanupQuery(res);
 +    } finally {
 +      executeString("DROP TABLE external_hbase_mapped_table PURGE").close();
 +      htable.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testIndexPredication() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b', " +
 +        "'hbase.split.rowkeys'='010,040,060,080', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +
 +    assertTableExists("hbase_mapped_table");
 +    HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
 +    hAdmin.tableExists("hbase_table");
 +
 +    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +    try {
 +      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
 +      assertEquals(5, keys.getFirst().length);
 +
 +      DecimalFormat df = new DecimalFormat("000");
 +      for (int i = 0; i < 100; i++) {
 +        Put put = new Put(String.valueOf(df.format(i)).getBytes());
 +        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
 +        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
 +        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
 +        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        htable.put(put);
 +      }
 +      assertIndexPredication(false);
 +
 +      ResultSet res = executeString("select * from hbase_mapped_table where rk >= '020' and rk <= '055'");
 +      assertResultSet(res);
 +      res.close();
 +
 +      res = executeString("select * from hbase_mapped_table where rk = '021'");
 +      String expected = "rk,col1,col2,col3\n" +
 +          "-------------------------------\n" +
 +          "021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
 +
 +      assertEquals(expected, resultSetToString(res));
 +      res.close();
 +    } finally {
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +      htable.close();
 +      hAdmin.close();
 +    }
 +  }
 +
 +  @Test
 +  public void testCompositeRowIndexPredication() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, rk2 text, col1 text, col2 text, col3 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
 +        "'hbase.split.rowkeys'='010,040,060,080', " +
 +        "'hbase.rowkey.delimiter'='_', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +
 +    assertTableExists("hbase_mapped_table");
 +    HBaseAdmin hAdmin = new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
 +    hAdmin.tableExists("hbase_table");
 +
 +    HTable htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +    try {
 +      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
 +      assertEquals(5, keys.getFirst().length);
 +
 +      DecimalFormat df = new DecimalFormat("000");
 +      for (int i = 0; i < 100; i++) {
 +        Put put = new Put((df.format(i) + "_" + df.format(i)).getBytes());
 +        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
 +        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
 +        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
 +        put.add("col3".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        htable.put(put);
 +      }
 +
 +      Scan scan = new Scan();
 +      scan.setStartRow("021".getBytes());
 +      scan.setStopRow(("021_" + new String(new char[]{Character.MAX_VALUE})).getBytes());
 +      Filter filter = new InclusiveStopFilter(scan.getStopRow());
 +      scan.setFilter(filter);
 +
 +      ResultScanner scanner = htable.getScanner(scan);
 +      Result result = scanner.next();
 +      assertNotNull(result);
 +      assertEquals("021_021", new String(result.getRow()));
 +      scanner.close();
 +
 +      assertIndexPredication(true);
 +
 +      ResultSet res = executeString("select * from hbase_mapped_table where rk = '021'");
 +      String expected = "rk,rk2,col1,col2,col3\n" +
 +          "-------------------------------\n" +
 +          "021,021,a-21,{\"k1\":\"k1-21\", \"k2\":\"k2-21\"},b-21\n";
 +
 +      assertEquals(expected, resultSetToString(res));
 +      res.close();
 +    } finally {
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +      htable.close();
 +      hAdmin.close();
 +    }
 +  }
 +
 +  private void assertIndexPredication(boolean isCompositeRowKey) throws Exception {
 +    String postFix = isCompositeRowKey ? "_" + new String(new char[]{Character.MAX_VALUE}) : "";
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    ScanNode scanNode = new ScanNode(1);
 +
 +    // where rk = '021'
 +    EvalNode evalNodeEq = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("021")));
 +    scanNode.setQual(evalNodeEq);
 +    StorageManager storageManager = StorageManager.getStorageManager(conf, StoreType.HBASE);
 +    List<Fragment> fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
 +    assertEquals(1, fragments.size());
 +    assertEquals("021", new String(((HBaseFragment)fragments.get(0)).getStartRow()));
 +    assertEquals("021" + postFix, new String(((HBaseFragment)fragments.get(0)).getStopRow()));
 +
 +    // where rk >= '020' and rk <= '055'
 +    EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("020")));
 +    EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("055")));
 +    EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2);
 +    scanNode.setQual(evalNodeA);
 +
 +    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
 +    assertEquals(2, fragments.size());
 +    HBaseFragment fragment1 = (HBaseFragment) fragments.get(0);
 +    assertEquals("020", new String(fragment1.getStartRow()));
 +    assertEquals("040", new String(fragment1.getStopRow()));
 +
 +    HBaseFragment fragment2 = (HBaseFragment) fragments.get(1);
 +    assertEquals("040", new String(fragment2.getStartRow()));
 +    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
 +
 +    // where (rk >= '020' and rk <= '055') or rk = '075'
 +    EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("075")));
 +    EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3);
 +    scanNode.setQual(evalNodeB);
 +    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
 +    assertEquals(3, fragments.size());
 +    fragment1 = (HBaseFragment) fragments.get(0);
 +    assertEquals("020", new String(fragment1.getStartRow()));
 +    assertEquals("040", new String(fragment1.getStopRow()));
 +
 +    fragment2 = (HBaseFragment) fragments.get(1);
 +    assertEquals("040", new String(fragment2.getStartRow()));
 +    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
 +
 +    HBaseFragment fragment3 = (HBaseFragment) fragments.get(2);
 +    assertEquals("075", new String(fragment3.getStartRow()));
 +    assertEquals("075" + postFix, new String(fragment3.getStopRow()));
 +
 +
 +    // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078')
 +    EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("072")));
 +    EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("078")));
 +    EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
 +    EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
 +    scanNode.setQual(evalNodeD);
 +    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
 +    assertEquals(3, fragments.size());
 +
 +    fragment1 = (HBaseFragment) fragments.get(0);
 +    assertEquals("020", new String(fragment1.getStartRow()));
 +    assertEquals("040", new String(fragment1.getStopRow()));
 +
 +    fragment2 = (HBaseFragment) fragments.get(1);
 +    assertEquals("040", new String(fragment2.getStartRow()));
 +    assertEquals("055" + postFix, new String(fragment2.getStopRow()));
 +
 +    fragment3 = (HBaseFragment) fragments.get(2);
 +    assertEquals("072", new String(fragment3.getStartRow()));
 +    assertEquals("078" + postFix, new String(fragment3.getStopRow()));
 +
 +    // where (rk >= '020' and rk <= '055') or (rk >= '057' and rk <= '059')
 +    evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("057")));
 +    evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(tableDesc.getLogicalSchema().getColumn("rk")),
 +        new ConstEval(new TextDatum("059")));
 +    evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5);
 +    evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC);
 +    scanNode.setQual(evalNodeD);
 +    fragments = storageManager.getSplits("hbase_mapped_table", tableDesc, scanNode);
 +    assertEquals(2, fragments.size());
 +
 +    fragment1 = (HBaseFragment) fragments.get(0);
 +    assertEquals("020", new String(fragment1.getStartRow()));
 +    assertEquals("040", new String(fragment1.getStopRow()));
 +
 +    fragment2 = (HBaseFragment) fragments.get(1);
 +    assertEquals("040", new String(fragment2.getStartRow()));
 +    assertEquals("059" + postFix, new String(fragment2.getStopRow()));
 +  }
 +
 +  @Test
 +  public void testNonForwardQuery() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:#b', " +
 +        "'hbase.split.rowkeys'='010,040,060,080', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +
 +    assertTableExists("hbase_mapped_table");
 +    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
 +    HTable htable = null;
 +    try {
 +      hAdmin.tableExists("hbase_table");
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
 +      assertEquals(5, keys.getFirst().length);
 +
 +      DecimalFormat df = new DecimalFormat("000");
 +      for (int i = 0; i < 100; i++) {
 +        Put put = new Put(String.valueOf(df.format(i)).getBytes());
 +        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
 +        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
 +        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
 +        put.add("col3".getBytes(), "".getBytes(), Bytes.toBytes(i));
 +        htable.put(put);
 +      }
 +
 +      ResultSet res = executeString("select * from hbase_mapped_table");
 +      assertResultSet(res);
 +      res.close();
 +    } finally {
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +      hAdmin.close();
 +      if (htable == null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testJoin() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int8) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
 +        "'hbase.split.rowkeys'='010,040,060,080', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +
 +    assertTableExists("hbase_mapped_table");
 +    HBaseAdmin hAdmin =  new HBaseAdmin(testingCluster.getHBaseUtil().getConf());
 +    HTable htable = null;
 +    try {
 +      hAdmin.tableExists("hbase_table");
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +      org.apache.hadoop.hbase.util.Pair<byte[][], byte[][]> keys = htable.getStartEndKeys();
 +      assertEquals(5, keys.getFirst().length);
 +
 +      DecimalFormat df = new DecimalFormat("000");
 +      for (int i = 0; i < 100; i++) {
 +        Put put = new Put(String.valueOf(df.format(i)).getBytes());
 +        put.add("col1".getBytes(), "a".getBytes(), ("a-" + i).getBytes());
 +        put.add("col1".getBytes(), "b".getBytes(), ("b-" + i).getBytes());
 +        put.add("col2".getBytes(), "k1".getBytes(), ("k1-" + i).getBytes());
 +        put.add("col2".getBytes(), "k2".getBytes(), ("k2-" + i).getBytes());
 +        put.add("col3".getBytes(), "b".getBytes(), Bytes.toBytes((long) i));
 +        htable.put(put);
 +      }
 +
 +      ResultSet res = executeString("select a.rk, a.col1, a.col2, a.col3, b.l_orderkey, b.l_linestatus " +
 +          "from hbase_mapped_table a " +
 +          "join default.lineitem b on a.col3 = b.l_orderkey");
 +      assertResultSet(res);
 +      res.close();
 +    } finally {
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +      hAdmin.close();
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertInto() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close();
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scan.addFamily(Bytes.toBytes("col2"));
 +      scan.addFamily(Bytes.toBytes("col3"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
 +          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
 +          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoMultiRegion() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
 +        "'hbase.split.rowkeys'='010,040,060,080', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.TEXT);
 +    schema.addColumn("name", Type.TEXT);
 +    List<String> datas = new ArrayList<String>();
 +    DecimalFormat df = new DecimalFormat("000");
 +    for (int i = 99; i >= 0; i--) {
 +      datas.add(df.format(i) + "|value" + i);
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select id, name from base_table ").close();
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1")},
 +          new byte[][]{null, Bytes.toBytes("a")},
 +          new boolean[]{false, false}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoMultiRegion2() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
 +        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.TEXT);
 +    schema.addColumn("name", Type.TEXT);
 +    List<String> datas = new ArrayList<String>();
 +    for (int i = 99; i >= 0; i--) {
 +      datas.add(i + "|value" + i);
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select id, name from base_table ").close();
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1")},
 +          new byte[][]{null, Bytes.toBytes("a")},
 +          new boolean[]{false, false}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoMultiRegionWithSplitFile() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    String splitFilePath = currentDatasetPath + "/splits.data";
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
 +        "'hbase.split.rowkeys.file'='" + splitFilePath + "', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.TEXT);
 +    schema.addColumn("name", Type.TEXT);
 +    List<String> datas = new ArrayList<String>();
 +    DecimalFormat df = new DecimalFormat("000");
 +    for (int i = 99; i >= 0; i--) {
 +      datas.add(df.format(i) + "|value" + i);
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select id, name from base_table ").close();
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1")},
 +          new byte[][]{null, Bytes.toBytes("a")},
 +          new boolean[]{false, false}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoMultiRegionMultiRowFields() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a', " +
 +        "'hbase.split.rowkeys'='001,002,003,004,005,006,007,008,009', " +
 +        "'hbase.rowkey.delimiter'='_', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id1", Type.TEXT);
 +    schema.addColumn("id2", Type.TEXT);
 +    schema.addColumn("name", Type.TEXT);
 +    DecimalFormat df = new DecimalFormat("000");
 +    List<String> datas = new ArrayList<String>();
 +    for (int i = 99; i >= 0; i--) {
 +      datas.add(df.format(i) + "|" + (i + 100) + "|value" + i);
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select id1, id2, name from base_table ").close();
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, null, Bytes.toBytes("col1")},
 +          new byte[][]{null, null, Bytes.toBytes("a")},
 +          new boolean[]{false, false, false}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoBinaryMultiRegion() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk int4, col1 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key#b,col1:a', " +
 +        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.INT4);
 +    schema.addColumn("name", Type.TEXT);
 +    List<String> datas = new ArrayList<String>();
 +    for (int i = 99; i >= 0; i--) {
 +      datas.add(i + "|value" + i);
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select id, name from base_table ").close();
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1")},
 +          new byte[][]{null, Bytes.toBytes("a")},
 +          new boolean[]{true, false}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoColumnKeyValue() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col2_key text, col2_value text, col3 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col2:key:,col2:value:,col3:', " +
 +        "'hbase.rowkey.delimiter'='_', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("rk", Type.TEXT);
 +    schema.addColumn("col2_key", Type.TEXT);
 +    schema.addColumn("col2_value", Type.TEXT);
 +    schema.addColumn("col3", Type.TEXT);
 +    List<String> datas = new ArrayList<String>();
 +    for (int i = 20; i >= 0; i--) {
 +      for (int j = 0; j < 3; j++) {
 +        datas.add(i + "|ck-" + j + "|value-" + j + "|col3-" + i);
 +      }
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select rk, col2_key, col2_value, col3 from base_table ").close();
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col2"));
 +      scan.addFamily(Bytes.toBytes("col3"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col2"), Bytes.toBytes("col3")},
 +          new byte[][]{null, null, null},
 +          new boolean[]{false, false, false}, tableDesc.getSchema()));
 +
 +      ResultSet res = executeString("select * from hbase_mapped_table");
 +
 +      String expected = "rk,col2_key,col2_value,col3\n" +
 +          "-------------------------------\n" +
 +          "0,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-0\n" +
 +          "1,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-1\n" +
 +          "10,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-10\n" +
 +          "11,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-11\n" +
 +          "12,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-12\n" +
 +          "13,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-13\n" +
 +          "14,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-14\n" +
 +          "15,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-15\n" +
 +          "16,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-16\n" +
 +          "17,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-17\n" +
 +          "18,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-18\n" +
 +          "19,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-19\n" +
 +          "2,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-2\n" +
 +          "20,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-20\n" +
 +          "3,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-3\n" +
 +          "4,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-4\n" +
 +          "5,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-5\n" +
 +          "6,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-6\n" +
 +          "7,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-7\n" +
 +          "8,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-8\n" +
 +          "9,[\"ck-0\", \"ck-1\", \"ck-2\"],[\"value-0\", \"value-1\", \"value-2\"],col3-9\n";
 +
 +      assertEquals(expected, resultSetToString(res));
 +      res.close();
 +
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoDifferentType() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
 +        "'hbase.split.rowkeys'='1,2,3,4,5,6,7,8,9', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.INT4);
 +    schema.addColumn("name", Type.TEXT);
 +    List<String> datas = new ArrayList<String>();
 +    for (int i = 99; i >= 0; i--) {
 +      datas.add(i + "|value" + i);
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    try {
 +      executeString("insert into hbase_mapped_table " +
 +          "select id, name from base_table ").close();
 +      fail("If inserting data type different with target table data type, should throw exception");
 +    } catch (Exception e) {
 +      assertTrue(e.getMessage().indexOf("VerifyException") >= 0);
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoRowField() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk1 text, rk2 text, col1 text, col2 text, col3 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'='0:key,1:key,col1:a,col2:,col3:b', " +
 +        "'hbase.rowkey.delimiter'='_', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    executeString("insert into hbase_mapped_table " +
 +        "select l_orderkey::text, l_partkey::text, l_shipdate, l_returnflag, l_suppkey::text from default.lineitem ");
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scan.addFamily(Bytes.toBytes("col2"));
 +      scan.addFamily(Bytes.toBytes("col3"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
 +          new byte[][]{null, Bytes.toBytes("a"), Bytes.toBytes(""), Bytes.toBytes("b")},
 +          new boolean[]{false, false, false, false}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testCATS() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    // create test table
 +    KeyValueSet tableOptions = new KeyValueSet();
 +    tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +    tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +    Schema schema = new Schema();
 +    schema.addColumn("id", Type.TEXT);
 +    schema.addColumn("name", Type.TEXT);
 +    List<String> datas = new ArrayList<String>();
 +    DecimalFormat df = new DecimalFormat("000");
 +    for (int i = 99; i >= 0; i--) {
 +      datas.add(df.format(i) + "|value" + i);
 +    }
 +    TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +        schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a', " +
 +        "'hbase.split.rowkeys'='010,040,060,080', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" +
 +        " as " +
 +        "select id, name from base_table"
 +    ).close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scanner = htable.getScanner(scan);
 +
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1")},
 +          new byte[][]{null, Bytes.toBytes("a")},
 +          new boolean[]{false, false}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoUsingPut() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text, col3 int4) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:,col3:b#b', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +    TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), "hbase_mapped_table");
 +
 +    Map<String, String> sessions = new HashMap<String, String>();
 +    sessions.put(HBaseStorageConstants.INSERT_PUT_MODE, "true");
 +    client.updateSessionVariables(sessions);
 +
 +    HTable htable = null;
 +    ResultScanner scanner = null;
 +    try {
 +      executeString("insert into hbase_mapped_table " +
 +          "select l_orderkey::text, l_shipdate, l_returnflag, l_suppkey from default.lineitem ").close();
 +
 +      htable = new HTable(testingCluster.getHBaseUtil().getConf(), "hbase_table");
 +
 +      Scan scan = new Scan();
 +      scan.addFamily(Bytes.toBytes("col1"));
 +      scan.addFamily(Bytes.toBytes("col2"));
 +      scan.addFamily(Bytes.toBytes("col3"));
 +      scanner = htable.getScanner(scan);
 +
 +      // result is dirrerent with testInsertInto because l_orderkey is not unique.
 +      assertStrings(resultSetToString(scanner,
 +          new byte[][]{null, Bytes.toBytes("col1"), Bytes.toBytes("col2"), Bytes.toBytes("col3")},
 +          new byte[][]{null, Bytes.toBytes("a"), null, Bytes.toBytes("b")},
 +          new boolean[]{false, false, false, true}, tableDesc.getSchema()));
 +
 +    } finally {
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +
 +      client.unsetSessionVariables(TUtil.newList(HBaseStorageConstants.INSERT_PUT_MODE));
 +
 +      if (scanner != null) {
 +        scanner.close();
 +      }
 +
 +      if (htable != null) {
 +        htable.close();
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testInsertIntoLocation() throws Exception {
 +    String hostName = InetAddress.getLocalHost().getHostName();
 +    String zkPort = testingCluster.getHBaseUtil().getConf().get(HConstants.ZOOKEEPER_CLIENT_PORT);
 +    assertNotNull(zkPort);
 +
 +    executeString("CREATE TABLE hbase_mapped_table (rk text, col1 text, col2 text) " +
 +        "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " +
 +        "'hbase.split.rowkeys'='010,040,060,080', " +
 +        "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
 +        "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')").close();
 +
 +    assertTableExists("hbase_mapped_table");
 +
 +    try {
 +      // create test table
 +      KeyValueSet tableOptions = new KeyValueSet();
 +      tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 +      tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 +
 +      Schema schema = new Schema();
 +      schema.addColumn("id", Type.TEXT);
 +      schema.addColumn("name", Type.TEXT);
 +      schema.addColumn("comment", Type.TEXT);
 +      List<String> datas = new ArrayList<String>();
 +      DecimalFormat df = new DecimalFormat("000");
 +      for (int i = 99; i >= 0; i--) {
 +        datas.add(df.format(i) + "|value" + i + "|comment-" + i);
 +      }
 +      TajoTestingCluster.createTable(getCurrentDatabase() + ".base_table",
 +          schema, tableOptions, datas.toArray(new String[]{}), 2);
 +
 +      executeString("insert into location '/tmp/hfile_test' " +
-           "USING hbase WITH ('table'='hbase_table', 'columns'=':key,col1:a,col2:', " +
-           "'" + HConstants.ZOOKEEPER_QUORUM + "'='" + hostName + "'," +
-           "'" + HConstants.ZOOKEEPER_CLIENT_PORT + "'='" + zkPort + "')" +
 +          "select id, name, comment from base_table ").close();
 +
 +      FileSystem fs = testingCluster.getDefaultFileSystem();
 +      Path path = new Path("/tmp/hfile_test");
 +      assertTrue(fs.exists(path));
 +
 +      FileStatus[] files = fs.listStatus(path);
 +      assertNotNull(files);
 +      assertEquals(2, files.length);
 +
-       assertEquals("/tmp/hfile_test/col2", files[1].getPath().toUri().getPath());
- 
-       int index = 1;
++      int index = 0;
 +      for (FileStatus eachFile: files) {
-         assertEquals("/tmp/hfile_test/col" + index, eachFile.getPath().toUri().getPath());
++        assertEquals("/tmp/hfile_test/part-01-00000" + index + "-00" + index, eachFile.getPath().toUri().getPath());
 +        for (FileStatus subFile: fs.listStatus(eachFile.getPath())) {
 +          assertTrue(subFile.isFile());
 +          assertTrue(subFile.getLen() > 0);
 +        }
 +        index++;
 +      }
 +    } finally {
 +      executeString("DROP TABLE base_table PURGE").close();
 +      executeString("DROP TABLE hbase_mapped_table PURGE").close();
 +    }
 +  }
 +
 +  private String resultSetToString(ResultScanner scanner,
 +                                   byte[][] cfNames, byte[][] qualifiers,
 +                                   boolean[] binaries,
 +                                   Schema schema) throws Exception {
 +    StringBuilder sb = new StringBuilder();
 +    Result result = null;
 +    while ( (result = scanner.next()) != null ) {
 +      if (binaries[0]) {
 +        sb.append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(0), result.getRow()).asChar());
 +      } else {
 +        sb.append(new String(result.getRow()));
 +      }
 +
 +      for (int i = 0; i < cfNames.length; i++) {
 +        if (cfNames[i] == null) {
 +          //rowkey
 +          continue;
 +        }
 +        if (qualifiers[i] == null) {
 +          Map<byte[], byte[]> values = result.getFamilyMap(cfNames[i]);
 +          if (values == null) {
 +            sb.append(", null");
 +          } else {
 +            sb.append(", {");
 +            String delim = "";
 +            for (Map.Entry<byte[], byte[]> valueEntry: values.entrySet()) {
 +              byte[] keyBytes = valueEntry.getKey();
 +              byte[] valueBytes = valueEntry.getValue();
 +
 +              if (binaries[i]) {
 +                sb.append(delim).append("\"").append(keyBytes == null ? "" : Bytes.toLong(keyBytes)).append("\"");
 +                sb.append(": \"").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\"");
 +              } else {
 +                sb.append(delim).append("\"").append(keyBytes == null ? "" : new String(keyBytes)).append("\"");
 +                sb.append(": \"").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), valueBytes)).append("\"");
 +              }
 +              delim = ", ";
 +            }
 +            sb.append("}");
 +          }
 +        } else {
 +          byte[] value = result.getValue(cfNames[i], qualifiers[i]);
 +          if (value == null) {
 +            sb.append(", null");
 +          } else {
 +            if (binaries[i]) {
 +              sb.append(", ").append(HBaseBinarySerializerDeserializer.deserialize(schema.getColumn(i), value));
 +            } else {
 +              sb.append(", ").append(HBaseTextSerializerDeserializer.deserialize(schema.getColumn(i), value));
 +            }
 +          }
 +        }
 +      }
 +      sb.append("\n");
 +    }
 +
 +    return sb.toString();
 +  }
 +}