You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/03/25 02:36:27 UTC

[12/13] TAJO-353: Add Database support to Tajo. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
index e22e8bc..0bb7e32 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchIndexException.java
@@ -21,31 +21,14 @@ package org.apache.tajo.catalog.exception;
 public class NoSuchIndexException extends CatalogException {
   private static final long serialVersionUID = 3705839985189534673L;
 
-  /**
-   * 
-   */
   public NoSuchIndexException() {
   }
 
-  /**
-   * @param message
-   */
-  public NoSuchIndexException(String message) {
-    super(message);
+  public NoSuchIndexException(String databaseName, String columnName) {
+    super(String.format("ERROR: index \" %s \" in %s does not exist", columnName, databaseName));
   }
 
-  /**
-   * @param cause
-   */
-  public NoSuchIndexException(Throwable cause) {
-    super(cause);
-  }
-
-  /**
-   * @param message
-   * @param cause
-   */
-  public NoSuchIndexException(String message, Throwable cause) {
-    super(message, cause);
+  public NoSuchIndexException(String indexName) {
+    super("ERROR: index \"" + indexName + "\" does not exist");
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTableException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTableException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTableException.java
index 80b3b9f..cc11444 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTableException.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTableException.java
@@ -24,6 +24,10 @@ public class NoSuchTableException extends CatalogException {
 
 	public NoSuchTableException() {}
 
+  public NoSuchTableException(String databaseName, String relName) {
+    super(String.format("ERROR: relation \" %s \" in %s does not exist", relName, databaseName));
+  }
+
 	public NoSuchTableException(String relName) {
 		super("ERROR: relation \"" + relName + "\" does not exist");
 	}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTablespaceException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTablespaceException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTablespaceException.java
new file mode 100644
index 0000000..8b7d80b
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchTablespaceException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.exception;
+
+public class NoSuchTablespaceException extends CatalogException {
+	private static final long serialVersionUID = 277182608283894937L;
+
+	public NoSuchTablespaceException() {}
+
+	public NoSuchTablespaceException(String spaceName) {
+		super("ERROR: tablespace \"" + spaceName + "\" does not exist");
+	}
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
index e89ee72..d775ba8 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
@@ -29,8 +29,6 @@ import org.apache.tajo.json.GsonObject;
  * <code>PartitionDesc</code> presents a table partition.
  */
 public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescProto>, Cloneable, GsonObject {
-
-  @Expose protected String tableId;                            // required
   @Expose protected String partitionName;                      // optional
   @Expose protected int ordinalPosition;                       // required
   @Expose protected String partitionValue;                     // optional
@@ -42,7 +40,6 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
   }
 
   public PartitionDesc(PartitionDesc partition) {
-    this.tableId = partition.tableId;
     this.partitionName = partition.partitionName;
     this.ordinalPosition = partition.ordinalPosition;
     this.partitionValue = partition.partitionValue;
@@ -50,7 +47,6 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
   }
 
   public PartitionDesc(CatalogProtos.PartitionDescProto proto) {
-    this.tableId = proto.getTableId();
     if(proto.hasPartitionName()) {
       this.partitionName = proto.getPartitionName();
     }
@@ -63,57 +59,43 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
     }
   }
 
+  public void setName(String partitionName) {
+    this.partitionName = partitionName;
+  }
   public String getName() {
     return partitionName;
   }
 
-  public String getTableId() {
-    return tableId;
-  }
 
+  public void setOrdinalPosition(int ordinalPosition) {
+    this.ordinalPosition = ordinalPosition;
+  }
   public int getOrdinalPosition() {
     return ordinalPosition;
   }
 
-  public String getPartitionValue() {
-    return partitionValue;
-  }
-
   public void setPartitionValue(String partitionValue) {
     this.partitionValue = partitionValue;
   }
-
-  public String getPath() {
-    return path;
-  }
-
-
-  public void setTable(String tableId) {
-    this.tableId = tableId;
-  }
-
-  public void setName(String partitionName) {
-    this.partitionName = partitionName;
-  }
-
-
-  public void setOrdinalPosition(int ordinalPosition) {
-    this.ordinalPosition = ordinalPosition;
+  public String getPartitionValue() {
+    return partitionValue;
   }
 
   public void setPath(String path) {
     this.path = path;
   }
+  public String getPath() {
+    return path;
+  }
 
   public int hashCode() {
-    return Objects.hashCode(tableId, partitionName, ordinalPosition, partitionValue, path);
+    return Objects.hashCode(partitionName, ordinalPosition, partitionValue, path);
   }
 
   public boolean equals(Object o) {
     if (o instanceof PartitionDesc) {
       PartitionDesc another = (PartitionDesc) o;
-      boolean eq = tableId.equals(another.tableId);
-      eq = eq && ((partitionName != null && another.partitionName != null
+      boolean eq = ((partitionName != null && another.partitionName != null
           && partitionName.equals(another.partitionName)) ||
           (partitionName == null && another.partitionName == null));
       eq = eq && (ordinalPosition == another.ordinalPosition);
@@ -134,7 +116,6 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
       builder = CatalogProtos.PartitionDescProto.newBuilder();
     }
 
-    builder.setTableId(tableId);
     if(this.partitionName != null) {
       builder.setPartitionName(partitionName);
     }
@@ -169,7 +150,6 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
   public Object clone() throws CloneNotSupportedException {
     PartitionDesc desc = (PartitionDesc) super.clone();
     desc.builder = CatalogProtos.PartitionDescProto.newBuilder();
-    desc.tableId = tableId;
     desc.partitionName = partitionName;
     desc.ordinalPosition = ordinalPosition;
     desc.partitionValue = partitionValue;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
index 4a8123a..48a105c 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionMethodDesc.java
@@ -29,35 +29,44 @@ import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.util.TUtil;
 
+import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
+import static org.apache.tajo.catalog.proto.CatalogProtos.TableIdentifierProto;
+
 /**
  * <code>PartitionMethodDesc</code> presents a table description, including partition type, and partition keys.
  */
 public class PartitionMethodDesc implements ProtoObject<CatalogProtos.PartitionMethodProto>, Cloneable, GsonObject {
   private CatalogProtos.PartitionMethodProto.Builder builder;
 
-  @Expose private String tableId;                                       // required
-  @Expose private CatalogProtos.PartitionType partitionType;            // required
-  @Expose private String expression;                                    // required
-  @Expose private Schema expressionSchema;                              // required
+  @Expose private String databaseName;                         // required
+  @Expose private String tableName;                            // required
+  @Expose private PartitionType partitionType;                 // required
+  @Expose private String expression;                           // required
+  @Expose private Schema expressionSchema;                     // required
 
   public PartitionMethodDesc() {
     builder = CatalogProtos.PartitionMethodProto.newBuilder();
   }
 
-  public PartitionMethodDesc(String tableId, CatalogProtos.PartitionType partitionType, String expression,
+  public PartitionMethodDesc(String databaseName, String tableName,
+                             PartitionType partitionType, String expression,
                              Schema expressionSchema) {
-    this.tableId = tableId;
+    this.databaseName = databaseName;
+    this.tableName = tableName;
     this.partitionType = partitionType;
     this.expression = expression;
     this.expressionSchema = expressionSchema;
   }
 
   public PartitionMethodDesc(CatalogProtos.PartitionMethodProto proto) {
-    this(proto.getTableId(), proto.getPartitionType(), proto.getExpression(), new Schema(proto.getExpressionSchema()));
+    this(proto.getTableIdentifier().getDatabaseName(),
+        proto.getTableIdentifier().getTableName(),
+        proto.getPartitionType(), proto.getExpression(),
+        new Schema(proto.getExpressionSchema()));
   }
 
-  public String getTableId() {
-    return tableId;
+  public String getTableName() {
+    return tableName;
   }
 
   public String getExpression() {
@@ -68,19 +77,19 @@ public class PartitionMethodDesc implements ProtoObject<CatalogProtos.PartitionM
     return expressionSchema;
   }
 
-  public CatalogProtos.PartitionType getPartitionType() {
+  public PartitionType getPartitionType() {
     return partitionType;
   }
 
-  public void setTableId(String tableId) {
-    this.tableId = tableId;
+  public void setTableName(String tableId) {
+    this.tableName = tableId;
   }
 
   public void setExpressionSchema(Schema expressionSchema) {
     this.expressionSchema = expressionSchema;
   }
 
-  public void setPartitionType(CatalogProtos.PartitionType partitionsType) {
+  public void setPartitionType(PartitionType partitionsType) {
     this.partitionType = partitionsType;
   }
 
@@ -92,7 +101,7 @@ public class PartitionMethodDesc implements ProtoObject<CatalogProtos.PartitionM
   public boolean equals(Object object) {
     if(object instanceof PartitionMethodDesc) {
       PartitionMethodDesc other = (PartitionMethodDesc) object;
-      boolean eq = tableId.equals(other.tableId);
+      boolean eq = tableName.equals(other.tableName);
       eq = eq && partitionType.equals(other.partitionType);
       eq = eq && expression.equals(other.expression);
       eq = eq && TUtil.checkEquals(expressionSchema, other.expressionSchema);
@@ -104,7 +113,7 @@ public class PartitionMethodDesc implements ProtoObject<CatalogProtos.PartitionM
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(tableId, partitionType, expression, expressionSchema);
+    return Objects.hashCode(tableName, partitionType, expression, expressionSchema);
   }
 
   @Override
@@ -112,7 +121,17 @@ public class PartitionMethodDesc implements ProtoObject<CatalogProtos.PartitionM
     if(builder == null) {
       builder = CatalogProtos.PartitionMethodProto.newBuilder();
     }
-    builder.setTableId(tableId);
+
+    TableIdentifierProto.Builder tableIdentifierBuilder = TableIdentifierProto.newBuilder();
+    if (databaseName != null) {
+      tableIdentifierBuilder.setDatabaseName(databaseName);
+    }
+    if (tableName != null) {
+      tableIdentifierBuilder.setTableName(tableName);
+    }
+
+    CatalogProtos.PartitionMethodProto.Builder builder = CatalogProtos.PartitionMethodProto.newBuilder();
+    builder.setTableIdentifier(tableIdentifierBuilder.build());
     builder.setPartitionType(partitionType);
     builder.setExpression(expression);
     builder.setExpressionSchema(expressionSchema.getProto());
@@ -123,7 +142,7 @@ public class PartitionMethodDesc implements ProtoObject<CatalogProtos.PartitionM
   public Object clone() throws CloneNotSupportedException {
     PartitionMethodDesc desc = (PartitionMethodDesc) super.clone();
     desc.builder = builder;
-    desc.tableId = tableId;
+    desc.tableName = tableName;
     desc.partitionType = partitionType;
     desc.expression = expression;
     desc.expressionSchema = (Schema) expressionSchema.clone();

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index 35171cc..14fb39f 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -40,16 +40,6 @@ enum OrderType {
     DSC = 2;
 }
 
-enum CompressType {
-    COMP_NONE = 0;
-    NULL_SUPPRESS = 1;
-    RUN_LENGTH = 2;
-    BIT_VECTOR = 3;
-    DICTIONARY = 4;
-    SNAPPY = 5;
-    LZ = 6;
-}
-
 enum PartitionType {
     RANGE = 0;
     HASH = 1;
@@ -57,18 +47,6 @@ enum PartitionType {
     COLUMN = 3;
 }
 
-message ColumnMetaProto {
-    required DataType dataType = 1;
-    required bool compressed = 2;
-    required bool sorted = 3;
-    required bool contiguous = 4;
-    required StoreType storeType = 5;
-    required CompressType compType = 6;
-    required int64 startRid = 7;
-    required int32 recordNum = 8;
-    required int32 offsetToIndex = 9;
-}
-
 message ColumnProto {
 	required string name = 1;
 	required DataType dataType = 3;
@@ -102,17 +80,38 @@ message FileFragmentProto {
 }
 
 message TableProto {
-    required StoreType storeType = 1;
-    optional KeyValueSetProto params = 2;
+  required StoreType storeType = 1;
+  optional KeyValueSetProto params = 2;
+}
+
+message CreateTablespaceRequest {
+  required string tablespace_name = 1;
+  required string tablespace_uri = 2;
+}
+
+message CreateDatabaseRequest {
+  required string database_name = 1;
+  optional string tablespace_name = 2;
 }
 
 message TableDescProto {
-	required string id = 1;
-	required string path = 2;
-	required TableProto meta = 3;
-	required SchemaProto schema = 4;
-	optional TableStatsProto stats = 5;
-  optional PartitionMethodProto partition = 6;
+	required string table_name = 2;
+	optional string path = 3;
+	required TableProto meta = 4;
+	required SchemaProto schema = 5;
+	optional TableStatsProto stats = 6;
+  optional PartitionMethodProto partition = 7;
+  optional bool isExternal = 8 [default = false];
+}
+
+message TableIdentifierProto {
+  required string database_name = 1;
+  required string table_name = 3;
+}
+
+message NamespaceProto {
+  required string database_name = 1;
+  optional string namespace = 2;
 }
 
 enum FunctionType {
@@ -136,8 +135,8 @@ message FunctionDescProto {
 }
 
 message IndexDescProto {
-    required string name = 1;
-    required string tableId = 2;
+    required TableIdentifierProto tableIdentifier = 1;
+    required string indexName = 2;
     required ColumnProto column = 3;
     required IndexMethod indexMethod = 4;
     optional bool isUnique = 5 [default = false];
@@ -156,11 +155,17 @@ message GetAllTableNamesResponse {
     repeated string tableName = 1;
 }
 
-message GetIndexRequest {
-    required string tableName = 1;
+message GetIndexByColumnRequest {
+    required TableIdentifierProto tableIdentifier = 1;
     required string columnName = 2;
 }
 
+message IndexNameProto {
+  required string databaseName = 1;
+  optional string namespace = 2;
+  required string indexName = 3;
+}
+
 message GetFunctionsResponse {
 	repeated FunctionDescProto functionDesc = 1;
 }
@@ -239,18 +244,18 @@ message SortSpecProto {
 
 
 message PartitionsProto {
-  repeated PartitionDescProto partition = 1;
+  required TableIdentifierProto tableIdentifier = 1;
+  repeated PartitionDescProto partition = 2;
 }
 
 message PartitionMethodProto {
-  required string tableId = 1;
+  required TableIdentifierProto tableIdentifier = 1;
   required PartitionType partitionType = 2;
   required string expression = 3;
   required SchemaProto expressionSchema = 4;
 }
 
 message PartitionDescProto {
-  required string tableId = 1;
   optional string partitionName = 2;
   required int32  ordinalPosition = 3;
   optional string partitionValue = 4;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
index 065ec9d..f2d9f89 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestCatalogUtil.java
@@ -26,7 +26,8 @@ import static org.junit.Assert.assertEquals;
 public class TestCatalogUtil {
   @Test
   public final void testGetCanonicalName() {
-    String canonical = CatalogUtil.getCanonicalName("sum", CatalogUtil.newSimpleDataTypeArray(Type.INT4, Type.INT8));
+    String canonical = CatalogUtil.getCanonicalSignature("sum", CatalogUtil.newSimpleDataTypeArray(Type.INT4,
+        Type.INT8));
     assertEquals("sum(int4,int8)", canonical);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
index 6a9adc7..3fa8f53 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestIndexDesc.java
@@ -18,13 +18,14 @@
 
 package org.apache.tajo.catalog;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexMethod;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
 
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
 
@@ -35,15 +36,15 @@ public class TestIndexDesc {
   
   static {
     desc1 = new IndexDesc(
-        "idx_test", "indexed", new Column("id", Type.INT4),
+        "idx_test", DEFAULT_DATABASE_NAME, "indexed", new Column("id", Type.INT4),
         IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
     
     desc2 = new IndexDesc(
-        "idx_test2", "indexed", new Column("score", Type.FLOAT8),
+        "idx_test2", DEFAULT_DATABASE_NAME, "indexed", new Column("score", Type.FLOAT8),
         IndexMethod.TWO_LEVEL_BIN_TREE, false, false, false);
     
     desc3 = new IndexDesc(
-        "idx_test", "indexed", new Column("id", Type.INT4),
+        "idx_test", DEFAULT_DATABASE_NAME, "indexed", new Column("id", Type.INT4),
         IndexMethod.TWO_LEVEL_BIN_TREE, true, true, true);
   }
 
@@ -64,16 +65,16 @@ public class TestIndexDesc {
 
   @Test
   public void testGetFields() {
-    assertEquals("idx_test", desc1.getName());
-    assertEquals("indexed", desc1.getTableId());
+    assertEquals("idx_test", desc1.getIndexName());
+    assertEquals("indexed", desc1.getTableName());
     assertEquals(new Column("id", Type.INT4), desc1.getColumn());
     assertEquals(IndexMethod.TWO_LEVEL_BIN_TREE, desc1.getIndexMethod());
     assertEquals(true, desc1.isUnique());
     assertEquals(true, desc1.isClustered());
     assertEquals(true, desc1.isAscending());
     
-    assertEquals("idx_test2", desc2.getName());
-    assertEquals("indexed", desc2.getTableId());
+    assertEquals("idx_test2", desc2.getIndexName());
+    assertEquals("indexed", desc2.getTableName());
     assertEquals(new Column("score", Type.FLOAT8), desc2.getColumn());
     assertEquals(IndexMethod.TWO_LEVEL_BIN_TREE, desc2.getIndexMethod());
     assertEquals(false, desc2.isUnique());

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
index d6f75ba..b7a9906 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestTableDesc.java
@@ -21,13 +21,13 @@ package org.apache.tajo.catalog;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.json.CatalogGsonHelper;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
index 5387673..50726a6 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
@@ -18,78 +18,67 @@
 
 package org.apache.tajo.catalog.store;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
 import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.Pair;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.exception.AlreadyExistsDatabaseException;
 import org.apache.tajo.catalog.exception.CatalogException;
+import org.apache.tajo.catalog.exception.NoSuchDatabaseException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.exception.NotImplementedException;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.exception.InternalException;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType;
 
 public class HCatalogStore extends CatalogConstants implements CatalogStore {
-
   protected final Log LOG = LogFactory.getLog(getClass());
+
+  private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir";
+
   protected Configuration conf;
   private static final int CLIENT_POOL_SIZE = 2;
   private final HCatalogStoreClientPool clientPool;
+  private final String defaultTableSpaceUri;
 
-  public HCatalogStore(final Configuration conf)
-      throws InternalException {
-    this(conf, new HCatalogStoreClientPool(CLIENT_POOL_SIZE, conf));
-  }
-
-  public HCatalogStore(final Configuration conf, HCatalogStoreClientPool pool)
-      throws InternalException {
+  public HCatalogStore(final Configuration conf) throws InternalException {
     this.conf = conf;
-    this.clientPool = pool;
+    this.defaultTableSpaceUri = TajoConf.getWarehouseDir((TajoConf) conf).toString();
+    this.clientPool = new HCatalogStoreClientPool(CLIENT_POOL_SIZE, conf);
   }
 
   @Override
-  public boolean existTable(final String name) throws CatalogException {
+  public boolean existTable(final String databaseName, final String tableName) throws CatalogException {
     boolean exist = false;
-
-    String dbName = null, tableName = null;
-    Pair<String, String> tablePair = null;
     org.apache.hadoop.hive.ql.metadata.Table table = null;
     HCatalogStoreClientPool.HCatalogStoreClient client = null;
-    // get db name and table name.
-    try {
-      tablePair = HCatUtil.getDbAndTableName(name);
-      dbName = tablePair.first;
-      tableName = tablePair.second;
-    } catch (Exception ioe) {
-      throw new CatalogException("Table name is wrong.", ioe);
-    }
 
     // get table
     try {
       try {
         client = clientPool.getClient();
-        table = HCatUtil.getTable(client.getHiveClient(), dbName, tableName);
+        table = HCatUtil.getTable(client.getHiveClient(), databaseName, tableName);
         if (table != null) {
           exist = true;
         }
@@ -106,9 +95,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
-  public final CatalogProtos.TableDescProto getTable(final String name) throws CatalogException {
-    String dbName = null, tableName = null;
-    Pair<String, String> tablePair = null;
+  public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName) throws CatalogException {
     org.apache.hadoop.hive.ql.metadata.Table table = null;
     HCatalogStoreClientPool.HCatalogStoreClient client = null;
     Path path = null;
@@ -118,15 +105,6 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
     TableStats stats = null;
     PartitionMethodDesc partitions = null;
 
-    // get db name and table name.
-    try {
-      tablePair = HCatUtil.getDbAndTableName(name);
-      dbName = tablePair.first;
-      tableName = tablePair.second;
-    } catch (Exception ioe) {
-      throw new CatalogException("Table name is wrong.", ioe);
-    }
-
     //////////////////////////////////
     // set tajo table schema.
     //////////////////////////////////
@@ -134,10 +112,10 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
       // get hive table schema
       try {
         client = clientPool.getClient();
-        table = HCatUtil.getTable(client.getHiveClient(), dbName, tableName);
+        table = HCatUtil.getTable(client.getHiveClient(), databaseName, tableName);
         path = table.getPath();
       } catch (NoSuchObjectException nsoe) {
-        throw new CatalogException("Table not found. - tableName:" + name, nsoe);
+        throw new CatalogException("Table not found. - tableName:" + tableName, nsoe);
       } catch (Exception e) {
         throw new CatalogException(e);
       }
@@ -149,7 +127,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
       try {
         tableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
       } catch (IOException ioe) {
-        throw new CatalogException("Fail to get table schema. - tableName:" + name, ioe);
+        throw new CatalogException("Fail to get table schema. - tableName:" + tableName, ioe);
       }
       List<HCatFieldSchema> fieldSchemaList = tableSchema.getFields();
       boolean isPartitionKey = false;
@@ -165,7 +143,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
         }
 
         if (!isPartitionKey) {
-          String fieldName = dbName + CatalogUtil.IDENTIFIER_DELIMITER + tableName +
+          String fieldName = databaseName + CatalogUtil.IDENTIFIER_DELIMITER + tableName +
               CatalogUtil.IDENTIFIER_DELIMITER + eachField.getName();
           TajoDataTypes.Type dataType = HCatalogUtil.getTajoFieldType(eachField.getType().toString());
           schema.addColumn(fieldName, dataType);
@@ -180,7 +158,8 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
       }
 
       stats = new TableStats();
-      options = Options.create();
+      options = new Options();
+      options.putAll(table.getParameters());
       Properties properties = table.getMetadata();
       if (properties != null) {
         // set field delimiter
@@ -243,7 +222,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
           for(int i = 0; i < partitionKeys.size(); i++) {
             FieldSchema fieldSchema = partitionKeys.get(i);
             TajoDataTypes.Type dataType = HCatalogUtil.getTajoFieldType(fieldSchema.getType().toString());
-            String fieldName = dbName + CatalogUtil.IDENTIFIER_DELIMITER + tableName +
+            String fieldName = databaseName + CatalogUtil.IDENTIFIER_DELIMITER + tableName +
                 CatalogUtil.IDENTIFIER_DELIMITER + fieldSchema.getName();
             expressionSchema.addColumn(new Column(fieldName, dataType));
             if (i > 0) {
@@ -252,7 +231,8 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
             sb.append(fieldSchema.getName());
           }
           partitions = new PartitionMethodDesc(
-              dbName + "." + tableName,
+              databaseName,
+              tableName,
               PartitionType.COLUMN,
               sb.toString(),
               expressionSchema);
@@ -263,7 +243,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
     }
     TableMeta meta = new TableMeta(storeType, options);
 
-    TableDesc tableDesc = new TableDesc(dbName + "." + tableName, schema, meta, path);
+    TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path);
     if (stats != null) {
       tableDesc.setStats(stats);
     }
@@ -284,57 +264,134 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
-  public final List<String> getAllTableNames() throws CatalogException {
-    List<String> dbs = null;
-    List<String> tables = null;
-    List<String> allTables = new ArrayList<String>();
+  public final List<String> getAllTableNames(String databaseName) throws CatalogException {
     HCatalogStoreClientPool.HCatalogStoreClient client = null;
 
     try {
       client = clientPool.getClient();
-      dbs = client.getHiveClient().getAllDatabases();
-      for(String eachDB: dbs) {
-        tables = client.getHiveClient().getAllTables(eachDB);
-        for(String eachTable: tables) {
-          allTables.add(eachDB + "." + eachTable);
-        }
-      }
+      return client.getHiveClient().getAllTables(databaseName);
     } catch (MetaException e) {
       throw new CatalogException(e);
     } finally {
       client.release();
     }
-    return allTables;
   }
 
   @Override
-  public final void addTable(final CatalogProtos.TableDescProto tableDescProto) throws CatalogException {
-    String dbName = null, tableName = null;
-    Pair<String, String> tablePair = null;
+  public void createTablespace(String spaceName, String spaceUri) throws CatalogException {
+    // SKIP
+  }
+
+  @Override
+  public boolean existTablespace(String spaceName) throws CatalogException {
+    // SKIP
+    return spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME);
+  }
+
+  @Override
+  public void dropTablespace(String spaceName) throws CatalogException {
+    // SKIP
+  }
+
+  @Override
+  public Collection<String> getAllTablespaceNames() throws CatalogException {
+    return Lists.newArrayList(TajoConstants.DEFAULT_TABLESPACE_NAME);
+  }
+
+  @Override
+  public void createDatabase(String databaseName, String tablespaceName) throws CatalogException {
     HCatalogStoreClientPool.HCatalogStoreClient client = null;
 
-    TableDesc tableDesc = new TableDesc(tableDescProto);
-    // get db name and table name.
     try {
-      tablePair = HCatUtil.getDbAndTableName(tableDesc.getName());
-      dbName = tablePair.first;
-      tableName = tablePair.second;
-    } catch (Exception ioe) {
-      throw new CatalogException("Table name is wrong.", ioe);
+      Database database = new Database(
+          databaseName,
+          "",
+          defaultTableSpaceUri + "/" + databaseName,
+          new HashMap<String, String>());
+      client = clientPool.getClient();
+      client.getHiveClient().createDatabase(database);
+    } catch (AlreadyExistsException e) {
+      throw new AlreadyExistsDatabaseException(databaseName);
+    } catch (Throwable t) {
+      throw new CatalogException(t);
+    } finally {
+      if (client != null) {
+        client.release();
+      }
     }
+  }
+
+  @Override
+  public boolean existDatabase(String databaseName) throws CatalogException {
+    HCatalogStoreClientPool.HCatalogStoreClient client = null;
 
     try {
       client = clientPool.getClient();
+      List<String> databaseNames = client.getHiveClient().getAllDatabases();
+      return databaseNames.contains(databaseName);
+    } catch (Throwable t) {
+      throw new CatalogException(t);
+    } finally {
+      if (client != null) {
+        client.release();
+      }
+    }
+  }
 
-      org.apache.hadoop.hive.metastore.api.Table table = new org.apache.hadoop.hive.metastore.api.Table();
+  @Override
+  public void dropDatabase(String databaseName) throws CatalogException {
+    HCatalogStoreClientPool.HCatalogStoreClient client = null;
 
-      table.setDbName(dbName);
+    try {
+      client = clientPool.getClient();
+      client.getHiveClient().dropDatabase(databaseName);
+    } catch (NoSuchObjectException e) {
+      throw new NoSuchDatabaseException(databaseName);
+    } catch (Throwable t) {
+      throw new CatalogException(databaseName);
+    } finally {
+      if (client != null) {
+        client.release();
+      }
+    }
+  }
+
+  @Override
+  public Collection<String> getAllDatabaseNames() throws CatalogException {
+    HCatalogStoreClientPool.HCatalogStoreClient client = null;
+
+    try {
+      client = clientPool.getClient();
+      return client.getHiveClient().getAllDatabases();
+    } catch (MetaException e) {
+      throw new CatalogException(e);
+    } finally {
+      if (client != null) {
+        client.release();
+      }
+    }
+  }
+
+  @Override
+  public final void createTable(final CatalogProtos.TableDescProto tableDescProto) throws CatalogException {
+    HCatalogStoreClientPool.HCatalogStoreClient client = null;
+
+    TableDesc tableDesc = new TableDesc(tableDescProto);
+    String [] splitted = CatalogUtil.splitFQTableName(CatalogUtil.normalizeIdentifier(tableDesc.getName()));
+    String databaseName = splitted[0];
+    String tableName = splitted[1];
+
+    try {
+      client = clientPool.getClient();
+
+      org.apache.hadoop.hive.metastore.api.Table table = new org.apache.hadoop.hive.metastore.api.Table();
+      table.setDbName(databaseName);
       table.setTableName(tableName);
+      table.setParameters(new HashMap<String, String>(tableDesc.getMeta().getOptions().getAllKeyValus()));
       // TODO: set owner
       //table.setOwner();
 
       StorageDescriptor sd = new StorageDescriptor();
-      sd.setParameters(new HashMap<String, String>());
       sd.setSerdeInfo(new SerDeInfo());
       sd.getSerdeInfo().setParameters(new HashMap<String, String>());
       sd.getSerdeInfo().setName(table.getTableName());
@@ -342,7 +399,18 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
       // if tajo set location method, thrift client make exception as follows:
       // Caused by: MetaException(message:java.lang.NullPointerException)
       // If you want to modify table path, you have to modify on Hive cli.
-      // sd.setLocation(tableDesc.getPath().toString());
+      if (tableDesc.isExternal()) {
+        table.setTableType(TableType.EXTERNAL_TABLE.name());
+        table.getParameters().put("EXTERNAL", "TRUE");
+
+        FileSystem fs = tableDesc.getPath().getFileSystem(conf);
+        if (fs.isFile(tableDesc.getPath())) {
+          LOG.warn("A table path is a file, but HCatalog does not allow a file path.");
+          sd.setLocation(tableDesc.getPath().getParent().toString());
+        } else {
+          sd.setLocation(tableDesc.getPath().toString());
+        }
+      }
 
       // set column information
       List<Column> columns = tableDesc.getSchema().getColumns();
@@ -350,7 +418,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
 
       for (Column eachField : columns) {
         cols.add(new FieldSchema(eachField.getSimpleName(),
-            HCatalogUtil.getHiveFieldType(eachField.getDataType().getType().name()), ""));
+            HCatalogUtil.getHiveFieldType(eachField.getDataType()), ""));
       }
       sd.setCols(cols);
 
@@ -359,7 +427,7 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
         List<FieldSchema> partitionKeys = new ArrayList<FieldSchema>();
         for(Column eachPartitionKey: tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) {
           partitionKeys.add(new FieldSchema( eachPartitionKey.getSimpleName(),
-              HCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType().getType().name()), ""));
+              HCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType()), ""));
         }
         table.setPartitionKeys(partitionKeys);
       }
@@ -418,23 +486,12 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
-  public final void deleteTable(final String name) throws CatalogException {
-    String dbName = null, tableName = null;
-    Pair<String, String> tablePair = null;
+  public final void dropTable(String databaseName, final String tableName) throws CatalogException {
     HCatalogStoreClientPool.HCatalogStoreClient client = null;
 
-    // get db name and table name.
-    try {
-      tablePair = HCatUtil.getDbAndTableName(name);
-      dbName = tablePair.first;
-      tableName = tablePair.second;
-    } catch (Exception ioe) {
-      throw new CatalogException("Table name is wrong.", ioe);
-    }
-
     try {
       client = clientPool.getClient();
-      client.getHiveClient().dropTable(dbName, tableName, false, false);
+      client.getHiveClient().dropTable(databaseName, tableName, false, false);
     } catch (NoSuchObjectException nsoe) {
     } catch (Exception e) {
       throw new CatalogException(e);
@@ -449,17 +506,18 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
-  public CatalogProtos.PartitionMethodProto getPartitionMethod(String tableName) throws CatalogException {
+  public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName)
+      throws CatalogException {
     return null;  // TODO - not implemented yet
   }
 
   @Override
-  public boolean existPartitionMethod(String tableName) throws CatalogException {
+  public boolean existPartitionMethod(String databaseName, String tableName) throws CatalogException {
     return false;  // TODO - not implemented yet
   }
 
   @Override
-  public void delPartitionMethod(String tableName) throws CatalogException {
+  public void dropPartitionMethod(String databaseName, String tableName) throws CatalogException {
     // TODO - not implemented yet
   }
 
@@ -469,8 +527,8 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
-  public void addPartition(CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException {
-    // TODO - not implemented yet
+  public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException {
+
   }
 
   @Override
@@ -489,10 +547,11 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
-  public void delPartitions(String tableName) throws CatalogException {
-    // TODO - not implemented yet
+  public void dropPartitions(String tableName) throws CatalogException {
+
   }
 
+
   @Override
   public final void addFunction(final FunctionDesc func) throws CatalogException {
     // TODO - not implemented yet
@@ -515,41 +574,42 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
   }
 
   @Override
-  public void delIndex(String indexName) throws CatalogException {
+  public void dropIndex(String databaseName, String indexName) throws CatalogException {
     // TODO - not implemented yet
   }
 
   @Override
-  public boolean existIndex(String indexName) throws CatalogException {
+  public boolean existIndexByName(String databaseName, String indexName) throws CatalogException {
     // TODO - not implemented yet
     return false;
   }
 
   @Override
-  public CatalogProtos.IndexDescProto[] getIndexes(String tableName) throws CatalogException {
+  public CatalogProtos.IndexDescProto[] getIndexes(String databaseName, String tableName) throws CatalogException {
     // TODO - not implemented yet
     return null;
   }
 
   @Override
-  public void addIndex(CatalogProtos.IndexDescProto proto) throws CatalogException {
+  public void createIndex(CatalogProtos.IndexDescProto proto) throws CatalogException {
     // TODO - not implemented yet
   }
 
   @Override
-  public CatalogProtos.IndexDescProto getIndex(String indexName) throws CatalogException {
+  public CatalogProtos.IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException {
     // TODO - not implemented yet
     return null;
   }
 
   @Override
-  public CatalogProtos.IndexDescProto getIndex(String tableName, String columnName) throws CatalogException {
+  public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName, String tableName, String columnName)
+      throws CatalogException {
     // TODO - not implemented yet
     return null;
   }
 
   @Override
-  public boolean existIndex(String tableName, String columnName) throws CatalogException{
+  public boolean existIndexByColumn(String databaseName, String tableName, String columnName) throws CatalogException {
     // TODO - not implemented yet
     return false;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java
index cc8ff08..5c416c6 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java
@@ -47,8 +47,9 @@ public class HCatalogStoreClientPool {
 
     private HCatalogStoreClient(HiveConf hiveConf) {
       try {
-        LOG.info("Creating MetaStoreClient. Pool Size = " + clientPool.size());
         this.hiveClient = new HiveMetaStoreClient(hiveConf);
+        clientPool.add(this);
+        LOG.info("MetaStoreClient created (size = " + clientPool.size() + ")");
       } catch (Exception e) {
         // Turn in to an unchecked exception
         throw new IllegalStateException(e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java
index fc2fdda..d426369 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java
@@ -87,30 +87,30 @@ public class HCatalogUtil {
     }
   }
 
-  public static String getHiveFieldType(String fieldType) {
-    Preconditions.checkNotNull(fieldType);
-    String typeStr = null;
+  public static String getHiveFieldType(TajoDataTypes.DataType dataType) {
+    Preconditions.checkNotNull(dataType);
 
-    if(fieldType.equalsIgnoreCase("INT4"))
-      typeStr = serdeConstants.INT_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("INT1"))
-      typeStr = serdeConstants.TINYINT_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("INT2"))
-      typeStr = serdeConstants.SMALLINT_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("INT8"))
-      typeStr = serdeConstants.BIGINT_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("BOOLEAN"))
-      typeStr = serdeConstants.BOOLEAN_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("FLOAT4"))
-      typeStr = serdeConstants.FLOAT_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("FLOAT8"))
-      typeStr = serdeConstants.DOUBLE_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("TEXT"))
-      typeStr = serdeConstants.STRING_TYPE_NAME;
-    else if(fieldType.equalsIgnoreCase("BLOB"))
-      typeStr = serdeConstants.BINARY_TYPE_NAME;
-
-    return typeStr;
+    switch (dataType.getType()) {
+    case CHAR: return serdeConstants.CHAR_TYPE_NAME;
+    case BOOLEAN: return serdeConstants.BOOLEAN_TYPE_NAME;
+    case INT1: return serdeConstants.TINYINT_TYPE_NAME;
+    case INT2: return serdeConstants.SMALLINT_TYPE_NAME;
+    case INT4: return serdeConstants.INT_TYPE_NAME;
+    case INT8: return serdeConstants.BIGINT_TYPE_NAME;
+    case FLOAT4: return serdeConstants.FLOAT_TYPE_NAME;
+    case FLOAT8: return serdeConstants.DOUBLE_TYPE_NAME;
+    case TEXT: return serdeConstants.STRING_TYPE_NAME;
+    case VARCHAR: return serdeConstants.VARCHAR_TYPE_NAME;
+    case NCHAR: return serdeConstants.VARCHAR_TYPE_NAME;
+    case NVARCHAR: return serdeConstants.VARCHAR_TYPE_NAME;
+    case BINARY: return serdeConstants.BINARY_TYPE_NAME;
+    case VARBINARY: return serdeConstants.BINARY_TYPE_NAME;
+    case BLOB: return serdeConstants.BINARY_TYPE_NAME;
+    case DATE: return serdeConstants.DATE_TYPE_NAME;
+    case TIMESTAMP: return serdeConstants.TIMESTAMP_TYPE_NAME;
+    default:
+      throw new CatalogException(dataType + " is not supported.");
+    }
   }
 
   public static String getStoreType(String fileFormat) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
index cb3973a..b9a734e 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
@@ -59,36 +58,23 @@ public class TestHCatalogStore {
   @BeforeClass
   public static void setUp() throws Exception {
     Path testPath = CommonTestingUtil.getTestDir();
-    warehousePath = new Path(testPath, DB_NAME);
+    warehousePath = new Path(testPath, "warehouse");
 
     //create local hiveMeta
     HiveConf conf = new HiveConf();
-    String jdbcUri = "jdbc:derby:;databaseName="+testPath.toUri().getPath()+"/metastore_db;create=true";
+    String jdbcUri = "jdbc:derby:;databaseName="+testPath.toUri().getPath()+"metastore_db;create=true";
     conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath.toUri().toString());
     conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, jdbcUri);
+    conf.set(TajoConf.ConfVars.WAREHOUSE_DIR.varname, warehousePath.toUri().toString());
 
     // create local HCatalogStore.
     TajoConf tajoConf = new TajoConf(conf);
-    Database db = new Database();
-    db.setLocationUri(warehousePath.toUri().toString());
-    db.setName(DB_NAME);
-    pool = new HCatalogStoreClientPool(1, tajoConf);
-    HCatalogStoreClientPool.HCatalogStoreClient client = pool.getClient();
-    client.getHiveClient().createDatabase(db);
-    client.release();
-
-    store = new HCatalogStore(tajoConf, pool);
+    store = new HCatalogStore(tajoConf);
+    store.createDatabase(DB_NAME, null);
   }
 
   @AfterClass
   public static void tearDown() throws IOException {
-    try {
-      HCatalogStoreClientPool.HCatalogStoreClient client = pool.getClient();
-      client.getHiveClient().dropDatabase(DB_NAME);
-      client.release();
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
     store.close();
   }
 
@@ -106,14 +92,14 @@ public class TestHCatalogStore {
     schema.addColumn("c_mktsegment", TajoDataTypes.Type.TEXT);
     schema.addColumn("c_comment", TajoDataTypes.Type.TEXT);
 
-    String tableName = DB_NAME + "." + CUSTOMER;
-    TableDesc table = new TableDesc(tableName, schema, meta, warehousePath);
-    store.addTable(table.getProto());
-    assertTrue(store.existTable(tableName));
+    TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, CUSTOMER), schema, meta,
+        new Path(warehousePath, new Path(DB_NAME, CUSTOMER)));
+    store.createTable(table.getProto());
+    assertTrue(store.existTable(DB_NAME, CUSTOMER));
 
-    TableDesc table1 = new TableDesc(store.getTable(table.getName()));
+    TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(new Path(table.getPath(), CUSTOMER), table1.getPath());
+    assertEquals(table.getPath(), table1.getPath());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -121,7 +107,7 @@ public class TestHCatalogStore {
 
     assertEquals(StringEscapeUtils.escapeJava(CatalogConstants.CSVFILE_DELIMITER_DEFAULT),
         table1.getMeta().getOption(CatalogConstants.CSVFILE_DELIMITER));
-    store.deleteTable(tableName);
+    store.dropTable(DB_NAME, CUSTOMER);
   }
 
   @Test
@@ -135,14 +121,14 @@ public class TestHCatalogStore {
     schema.addColumn("r_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("r_comment", TajoDataTypes.Type.TEXT);
 
-    String tableName = DB_NAME + "." + REGION;
-    TableDesc table = new TableDesc(tableName, schema, meta, warehousePath);
-    store.addTable(table.getProto());
-    assertTrue(store.existTable(tableName));
+    TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, REGION), schema, meta,
+        new Path(warehousePath, new Path(DB_NAME, REGION)));
+    store.createTable(table.getProto());
+    assertTrue(store.existTable(DB_NAME, REGION));
 
-    TableDesc table1 = new TableDesc(store.getTable(table.getName()));
+    TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(new Path(table.getPath(), REGION), table1.getPath());
+    assertEquals(table.getPath(), table1.getPath());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -150,7 +136,7 @@ public class TestHCatalogStore {
 
     assertEquals(CatalogConstants.RCFILE_BINARY_SERDE,
         table1.getMeta().getOption(CatalogConstants.RCFILE_SERDE));
-    store.deleteTable(tableName);
+    store.dropTable(DB_NAME, REGION);
   }
 
   @Test
@@ -164,21 +150,21 @@ public class TestHCatalogStore {
     schema.addColumn("r_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("r_comment", TajoDataTypes.Type.TEXT);
 
-    String tableName = DB_NAME + "." + REGION;
-    TableDesc table = new TableDesc(tableName, schema, meta, warehousePath);
-    store.addTable(table.getProto());
-    assertTrue(store.existTable(tableName));
+    TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, REGION), schema, meta,
+        new Path(warehousePath, new Path(DB_NAME, REGION)));
+    store.createTable(table.getProto());
+    assertTrue(store.existTable(DB_NAME, REGION));
 
-    TableDesc table1 = new TableDesc(store.getTable(table.getName()));
+    TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(new Path(table.getPath(), REGION), table1.getPath());
+    assertEquals(table.getPath(), table1.getPath());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
     }
 
     assertEquals(CatalogConstants.RCFILE_TEXT_SERDE, table1.getMeta().getOption(CatalogConstants.RCFILE_SERDE));
-    store.deleteTable(tableName);
+    store.dropTable(DB_NAME, REGION);
   }
 
   @Test
@@ -198,16 +184,16 @@ public class TestHCatalogStore {
     schema.addColumn("s_acctbal", TajoDataTypes.Type.FLOAT8);
     schema.addColumn("s_comment", TajoDataTypes.Type.TEXT);
 
-    String tableName = DB_NAME + "." + SUPPLIER;
-    TableDesc table = new TableDesc(tableName, schema, meta, warehousePath);
+    TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, SUPPLIER), schema, meta,
+        new Path(warehousePath, new Path(DB_NAME, SUPPLIER)));
 
 
-    store.addTable(table.getProto());
-    assertTrue(store.existTable(tableName));
+    store.createTable(table.getProto());
+    assertTrue(store.existTable(DB_NAME, SUPPLIER));
 
-    TableDesc table1 = new TableDesc(store.getTable(table.getName()));
+    TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(new Path(table.getPath(), SUPPLIER), table1.getPath());
+    assertEquals(table.getPath(), table1.getPath());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -218,7 +204,7 @@ public class TestHCatalogStore {
 
     assertEquals(table.getMeta().getOption(CatalogConstants.CSVFILE_NULL),
         table1.getMeta().getOption(CatalogConstants.CSVFILE_NULL));
-    store.deleteTable(tableName);
+    store.dropTable(DB_NAME, SUPPLIER);
   }
 
   @Test
@@ -231,22 +217,24 @@ public class TestHCatalogStore {
     schema.addColumn("n_comment", TajoDataTypes.Type.TEXT);
 
 
-    String tableName = DB_NAME + "." + NATION;
-    TableDesc table = new TableDesc(tableName, schema, meta, warehousePath);
+    TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, NATION), schema, meta,
+        new Path(warehousePath, new Path(DB_NAME, NATION)));
 
     org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema();
     expressionSchema.addColumn("n_nationkey", TajoDataTypes.Type.INT4);
 
-    PartitionMethodDesc partitions = new PartitionMethodDesc(table.getName(),
+    PartitionMethodDesc partitions = new PartitionMethodDesc(
+        DB_NAME,
+        NATION,
         CatalogProtos.PartitionType.COLUMN, expressionSchema.getColumn(0).getQualifiedName(), expressionSchema);
     table.setPartitionMethod(partitions);
 
-    store.addTable(table.getProto());
-    assertTrue(store.existTable(table.getName()));
+    store.createTable(table.getProto());
+    assertTrue(store.existTable(DB_NAME, NATION));
 
-    TableDesc table1 = new TableDesc(store.getTable(table.getName()));
+    TableDesc table1 = new TableDesc(store.getTable(DB_NAME, NATION));
     assertEquals(table.getName(), table1.getName());
-    assertEquals(new Path(table.getPath(), NATION), table1.getPath());
+    assertEquals(table.getPath(), table1.getPath());
     assertEquals(table.getSchema().size(), table1.getSchema().size());
     for (int i = 0; i < table.getSchema().size(); i++) {
       assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
@@ -260,7 +248,7 @@ public class TestHCatalogStore {
       assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName());
     }
 
-    store.deleteTable(tableName);
+    store.dropTable(DB_NAME, NATION);
   }
 
 
@@ -272,14 +260,15 @@ public class TestHCatalogStore {
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
     schema.addColumn("n_comment", TajoDataTypes.Type.TEXT);
 
-    String[] tableNames = new String[]{"default.table1", "default.table2", "default.table3"};
+    String[] tableNames = new String[]{"table1", "table2", "table3"};
 
     for(String tableName : tableNames){
-      TableDesc table = new TableDesc(tableName, schema, meta, warehousePath);
-      store.addTable(table.getProto());
+      TableDesc table = new TableDesc(CatalogUtil.buildFQName("default", tableName), schema, meta,
+          new Path(warehousePath, new Path(DB_NAME, tableName)));
+      store.createTable(table.getProto());
     }
 
-    List<String> tables = store.getAllTableNames();
+    List<String> tables = store.getAllTableNames("default");
     assertEquals(tableNames.length, tables.size());
 
     for(String tableName : tableNames){
@@ -287,7 +276,7 @@ public class TestHCatalogStore {
     }
 
     for(String tableName : tableNames){
-      store.deleteTable(tableName);
+      store.dropTable("default", tableName);
     }
   }
 
@@ -301,15 +290,15 @@ public class TestHCatalogStore {
 
     String tableName = "table1";
     TableDesc table = new TableDesc(DB_NAME + "." + tableName, schema, meta, warehousePath);
-    store.addTable(table.getProto());
-    assertTrue(store.existTable(table.getName()));
+    store.createTable(table.getProto());
+    assertTrue(store.existTable(DB_NAME, tableName));
 
-    TableDesc table1 = new TableDesc(store.getTable(table.getName()));
+    TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName));
     FileSystem fs = FileSystem.getLocal(new Configuration());
     assertTrue(fs.exists(table1.getPath()));
 
-    store.deleteTable(table1.getName());
-    assertFalse(store.existTable(table1.getName()));
+    store.dropTable(DB_NAME, tableName);
+    assertFalse(store.existTable(DB_NAME, tableName));
     fs.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3ba26241/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 621b475..d4ee769 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.annotation.ThreadSafe;
 import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.*;
@@ -38,6 +39,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.ProtoUtil;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
@@ -49,13 +51,18 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
+import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
 
 /**
  * This class provides the catalog service. The catalog service enables clients
  * to register, unregister and access information about tables, functions, and
  * cluster information.
  */
+@ThreadSafe
 public class CatalogServer extends AbstractService {
+
+  private final static String DEFAULT_NAMESPACE = "public";
+
   private final static Log LOG = LogFactory.getLog(CatalogServer.class);
   private TajoConf conf;
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -96,14 +103,14 @@ public class CatalogServer extends AbstractService {
   }
 
   @Override
-  public void init(Configuration conf) {
+  public void serviceInit(Configuration conf) throws Exception {
 
     Constructor<?> cons;
     try {
       if (conf instanceof TajoConf) {
         this.conf = (TajoConf) conf;
       } else {
-        throw new CatalogException();
+        throw new CatalogException("conf must be a TajoConf instance");
       }
 
       Class<?> storeClass = this.conf.getClass(CatalogConstants.STORE_CLASS, DerbyStore.class);
@@ -120,13 +127,17 @@ public class CatalogServer extends AbstractService {
       throw new CatalogException(t);
     }
 
-    super.init(conf);
+    super.serviceInit(conf);
   }
 
   public TajoConf getConf() {
     return conf;
   }
 
+  public String getStoreClassName() {
+    return store.getClass().getCanonicalName();
+  }
+
   public String getCatalogServerName() {
     String catalogUri = null;
     if(conf.get(CatalogConstants.DEPRECATED_CATALOG_URI) != null) {
@@ -197,42 +208,204 @@ public class CatalogServer extends AbstractService {
   public class CatalogProtocolHandler implements CatalogProtocolService.BlockingInterface {
 
     @Override
-    public TableDescProto getTableDesc(RpcController controller,
-                                       StringProto name)
-        throws ServiceException {
+    public BoolProto createTablespace(RpcController controller, CreateTablespaceRequest request) throws ServiceException {
+      final String tablespaceName = CatalogUtil.normalizeIdentifier(request.getTablespaceName());
+      final String uri = request.getTablespaceUri();
+
+      wlock.lock();
+      try {
+        if (store.existTablespace(tablespaceName)) {
+          throw new AlreadyExistsDatabaseException(tablespaceName);
+        }
+
+        store.createTablespace(tablespaceName, uri);
+        LOG.info(String.format("tablespace \"%s\" (%s) is created", tablespaceName, uri));
+        return ProtoUtil.TRUE;
+
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        wlock.unlock();
+      }
+    }
+
+    @Override
+    public BoolProto dropTablespace(RpcController controller, StringProto request) throws ServiceException {
+      String tablespaceName = CatalogUtil.normalizeIdentifier(request.getValue());
+
+      wlock.lock();
+      try {
+        if (!store.existTablespace(tablespaceName)) {
+          throw new NoSuchTablespaceException(tablespaceName);
+        }
+
+        store.dropTablespace(tablespaceName);
+        return ProtoUtil.TRUE;
+
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        wlock.unlock();
+      }
+    }
+
+    @Override
+    public BoolProto existTablespace(RpcController controller, StringProto request) throws ServiceException {
+      String tablespaceName = CatalogUtil.normalizeIdentifier(request.getValue());
+
       rlock.lock();
       try {
-        String tableId = name.getValue().toLowerCase();
-        if (!store.existTable(tableId)) {
-          throw new NoSuchTableException(tableId);
+        if (store.existTablespace(tablespaceName)) {
+          return ProtoUtil.TRUE;
+        } else {
+          return ProtoUtil.FALSE;
         }
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        rlock.unlock();
+      }
+    }
 
-        return store.getTable(tableId);
+    @Override
+    public StringListProto getAllTablespaceNames(RpcController controller, NullProto request) throws ServiceException {
+      rlock.lock();
+      try {
+        return ProtoUtil.convertStrings(store.getAllDatabaseNames());
       } catch (Exception e) {
-        // TODO - handle exception
         LOG.error(e);
-        return null;
+        throw new ServiceException(e);
       } finally {
         rlock.unlock();
       }
     }
 
     @Override
-    public GetAllTableNamesResponse getAllTableNames(RpcController controller,
-                                                     NullProto request)
+    public BoolProto createDatabase(RpcController controller, CreateDatabaseRequest request) throws ServiceException {
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String tablespaceName = CatalogUtil.normalizeIdentifier(request.getTablespaceName());
+
+      wlock.lock();
+      try {
+        if (store.existDatabase(databaseName)) {
+          throw new AlreadyExistsDatabaseException(databaseName);
+        }
+
+        store.createDatabase(databaseName, tablespaceName);
+        LOG.info(String.format("database \"%s\" is created", databaseName));
+        return ProtoUtil.TRUE;
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        wlock.unlock();
+      }
+    }
+
+    @Override
+    public BoolProto dropDatabase(RpcController controller, StringProto request) throws ServiceException {
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getValue());
+
+      wlock.lock();
+      try {
+        if (!store.existDatabase(databaseName)) {
+          throw new NoSuchDatabaseException(databaseName);
+        }
+
+        store.dropDatabase(databaseName);
+        return ProtoUtil.TRUE;
+
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        wlock.unlock();
+      }
+    }
+
+    @Override
+    public BoolProto existDatabase(RpcController controller, StringProto request) throws ServiceException {
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getValue());
+
+      rlock.lock();
+      try {
+        if (store.existDatabase(databaseName)) {
+          return ProtoUtil.TRUE;
+        } else {
+          return ProtoUtil.FALSE;
+        }
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public StringListProto getAllDatabaseNames(RpcController controller, NullProto request) throws ServiceException {
+      rlock.lock();
+      try {
+        return ProtoUtil.convertStrings(store.getAllDatabaseNames());
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public TableDescProto getTableDesc(RpcController controller,
+                                       TableIdentifierProto request) throws ServiceException {
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String tableName = CatalogUtil.normalizeIdentifier(request.getTableName());
+
+      rlock.lock();
+      try {
+        boolean contain;
+
+        contain = store.existDatabase(databaseName);
+
+        if (contain) {
+          contain = store.existTable(databaseName, tableName);
+          if (contain) {
+            return store.getTable(databaseName, tableName);
+          } else {
+            throw new NoSuchTableException(databaseName);
+          }
+        } else {
+          throw new NoSuchDatabaseException(databaseName);
+        }
+      } catch (Exception e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public StringListProto getAllTableNames(RpcController controller, StringProto request)
         throws ServiceException {
+
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getValue());
+
+      rlock.lock();
       try {
-        Iterator<String> iterator = store.getAllTableNames().iterator();
-        GetAllTableNamesResponse.Builder builder =
-            GetAllTableNamesResponse.newBuilder();
-        while (iterator.hasNext()) {
-          builder.addTableName(iterator.next());
+        if (store.existDatabase(databaseName)) {
+          return ProtoUtil.convertStrings(store.getAllTableNames(databaseName));
+        } else {
+          throw new NoSuchDatabaseException(databaseName);
         }
-        return builder.build();
       } catch (Exception e) {
-        // TODO - handle exception
         LOG.error(e);
-        return null;
+        throw new ServiceException(e);
+      } finally {
+        rlock.unlock();
       }
     }
 
@@ -249,37 +422,61 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public BoolProto addTable(RpcController controller, TableDescProto proto)
-        throws ServiceException {
+    public BoolProto createTable(RpcController controller, TableDescProto request)throws ServiceException {
+
+      String [] splitted =
+          CatalogUtil.splitFQTableName(CatalogUtil.normalizeIdentifier(request.getTableName()));
+
+      String databaseName = splitted[0];
+      String tableName = splitted[1];
 
       wlock.lock();
       try {
-        if (store.existTable(proto.getId().toLowerCase())) {
-          throw new AlreadyExistsTableException(proto.getId());
+
+        boolean contain = store.existDatabase(databaseName);
+
+        if (contain) {
+          if (store.existTable(databaseName, tableName)) {
+            throw new AlreadyExistsTableException(databaseName, tableName);
+          }
+
+          store.createTable(request);
+          LOG.info(String.format("relation \"%s\" is added to the catalog (%s)",
+              CatalogUtil.getCanonicalTableName(databaseName, tableName), bindAddressStr));
+        } else {
+          throw new NoSuchDatabaseException(databaseName);
         }
-        store.addTable(proto);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
-        return BOOL_FALSE;
+        return ProtoUtil.FALSE;
       } finally {
         wlock.unlock();
-        LOG.info("Table " + proto.getId() + " is added to the catalog ("
-            + bindAddressStr + ")");
       }
 
-      return BOOL_TRUE;
+      return ProtoUtil.TRUE;
     }
 
     @Override
-    public BoolProto deleteTable(RpcController controller, StringProto name)
-        throws ServiceException {
+    public BoolProto dropTable(RpcController controller, TableIdentifierProto request) throws ServiceException {
+
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String tableName = CatalogUtil.normalizeIdentifier(request.getTableName());
+
       wlock.lock();
       try {
-        String tableId = name.getValue().toLowerCase();
-        if (!store.existTable(tableId)) {
-          throw new NoSuchTableException(tableId);
+        boolean contain = store.existDatabase(databaseName);
+
+        if (contain) {
+          if (!store.existTable(databaseName, tableName)) {
+            throw new NoSuchTableException(databaseName, tableName);
+          }
+
+          store.dropTable(databaseName, tableName);
+          LOG.info(String.format("relation \"%s\" is deleted from the catalog (%s)",
+              CatalogUtil.getCanonicalTableName(databaseName, tableName), bindAddressStr));
+        } else {
+          throw new NoSuchDatabaseException(databaseName);
         }
-        store.deleteTable(tableId);
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         return BOOL_FALSE;
@@ -291,76 +488,122 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public BoolProto existsTable(RpcController controller, StringProto name)
+    public BoolProto existsTable(RpcController controller, TableIdentifierProto request)
         throws ServiceException {
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String tableName = CatalogUtil.normalizeIdentifier(request.getTableName());
+
+      rlock.lock();
       try {
-        String tableId = name.getValue().toLowerCase();
-        if (store.existTable(tableId)) {
-          return BOOL_TRUE;
+
+        boolean contain = store.existDatabase(databaseName);
+
+        if (contain) {
+          if (store.existTable(databaseName, tableName)) {
+            return BOOL_TRUE;
+          } else {
+            return BOOL_FALSE;
+          }
         } else {
-          return BOOL_FALSE;
+          throw new NoSuchDatabaseException(databaseName);
         }
       } catch (Exception e) {
         LOG.error(e);
         throw new ServiceException(e);
+      } finally {
+        rlock.unlock();
       }
+
     }
 
     @Override
     public PartitionMethodProto getPartitionMethodByTableName(RpcController controller,
-                                                              StringProto name)
+                                                              TableIdentifierProto request)
         throws ServiceException {
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String tableName = CatalogUtil.normalizeIdentifier(request.getTableName());
+
       rlock.lock();
       try {
-        String tableId = name.getValue().toLowerCase();
-        return store.getPartitionMethod(tableId);
+        boolean contain;
+
+        contain = store.existDatabase(databaseName);
+
+        if (contain) {
+          contain = store.existTable(databaseName, tableName);
+          if (contain) {
+            if (store.existPartitionMethod(databaseName, tableName)) {
+              return store.getPartitionMethod(databaseName, tableName);
+            } else {
+              throw new NoPartitionedTableException(databaseName, tableName);
+            }
+          } else {
+            throw new NoSuchTableException(databaseName);
+          }
+        } else {
+          throw new NoSuchDatabaseException(databaseName);
+        }
       } catch (Exception e) {
-        // TODO - handle exception
         LOG.error(e);
-        return null;
+        throw new ServiceException(e);
       } finally {
         rlock.unlock();
       }
     }
 
     @Override
-    public BoolProto existPartitionMethod(RpcController controller, StringProto tableName)
+    public BoolProto existPartitionMethod(RpcController controller, TableIdentifierProto request)
         throws ServiceException {
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String tableName = CatalogUtil.normalizeIdentifier(request.getTableName());
+
       rlock.lock();
       try {
-        String tableId = tableName.getValue().toLowerCase();
-        return BoolProto.newBuilder().setValue(
-            store.existPartitionMethod(tableId)).build();
+        boolean contain;
+
+        contain = store.existDatabase(databaseName);
+
+        if (contain) {
+          contain = store.existTable(databaseName, tableName);
+          if (contain) {
+            if (store.existPartitionMethod(databaseName, tableName)) {
+              return ProtoUtil.TRUE;
+            } else {
+              return ProtoUtil.FALSE;
+            }
+          } else {
+            throw new NoSuchTableException(databaseName);
+          }
+        } else {
+          throw new NoSuchDatabaseException(databaseName);
+        }
       } catch (Exception e) {
         LOG.error(e);
-        return BoolProto.newBuilder().setValue(false).build();
+        throw new ServiceException(e);
       } finally {
         rlock.unlock();
       }
     }
 
     @Override
-    public BoolProto delPartitionMethod(RpcController controller, StringProto request)
+    public BoolProto dropPartitionMethod(RpcController controller, TableIdentifierProto request)
         throws ServiceException {
-      return null;
+      return ProtoUtil.TRUE;
     }
 
     @Override
-    public BoolProto addPartitions(RpcController controller, PartitionsProto request)
-        throws ServiceException {
+    public BoolProto addPartitions(RpcController controller, PartitionsProto request) throws ServiceException {
 
-      return null;
+      return ProtoUtil.TRUE;
     }
 
     @Override
-    public BoolProto addPartition(RpcController controller, PartitionDescProto request)
-        throws ServiceException {
-      return null;
+    public BoolProto addPartition(RpcController controller, PartitionDescProto request) throws ServiceException {
+      return ProtoUtil.TRUE;
     }
 
     @Override
-    public PartitionDescProto getPartitionByPartitionName(RpcController controller,
-                                                          StringProto request)
+    public PartitionDescProto getPartitionByPartitionName(RpcController controller, StringProto request)
         throws ServiceException {
       return null;
     }
@@ -379,16 +622,18 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public BoolProto addIndex(RpcController controller, IndexDescProto indexDesc)
+    public BoolProto createIndex(RpcController controller, IndexDescProto indexDesc)
         throws ServiceException {
       rlock.lock();
       try {
-        if (store.existIndex(indexDesc.getName())) {
-          throw new AlreadyExistsIndexException(indexDesc.getName());
+        if (store.existIndexByName(
+            indexDesc.getTableIdentifier().getDatabaseName(),
+            indexDesc.getIndexName())) {
+          throw new AlreadyExistsIndexException(indexDesc.getIndexName());
         }
-        store.addIndex(indexDesc);
+        store.createIndex(indexDesc);
       } catch (Exception e) {
-        LOG.error("ERROR : cannot add index " + indexDesc.getName(), e);
+        LOG.error("ERROR : cannot add index " + indexDesc.getIndexName(), e);
         LOG.error(indexDesc);
         throw new ServiceException(e);
       } finally {
@@ -399,13 +644,14 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public BoolProto existIndexByName(RpcController controller,
-                                      StringProto indexName)
-        throws ServiceException {
+    public BoolProto existIndexByName(RpcController controller, IndexNameProto request) throws ServiceException {
+
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String indexName = CatalogUtil.normalizeIdentifier(request.getIndexName());
+
       rlock.lock();
       try {
-        return BoolProto.newBuilder().setValue(
-            store.existIndex(indexName.getValue())).build();
+        return store.existIndexByName(databaseName, indexName) ? ProtoUtil.TRUE : ProtoUtil.FALSE;
       } catch (Exception e) {
         LOG.error(e);
         return BoolProto.newBuilder().setValue(false).build();
@@ -415,14 +661,18 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public BoolProto existIndex(RpcController controller,
-                                GetIndexRequest request)
+    public BoolProto existIndexByColumn(RpcController controller, GetIndexByColumnRequest request)
         throws ServiceException {
+
+      TableIdentifierProto identifier = request.getTableIdentifier();
+      String databaseName = CatalogUtil.normalizeIdentifier(identifier.getDatabaseName());
+      String tableName = CatalogUtil.normalizeIdentifier(identifier.getTableName());
+      String columnName = CatalogUtil.normalizeIdentifier(request.getColumnName());
+
       rlock.lock();
       try {
-        return BoolProto.newBuilder().setValue(
-            store.existIndex(request.getTableName(),
-                request.getColumnName())).build();
+        return store.existIndexByColumn(databaseName, tableName, columnName) ?
+            ProtoUtil.TRUE : ProtoUtil.FALSE;
       } catch (Exception e) {
         LOG.error(e);
         return BoolProto.newBuilder().setValue(false).build();
@@ -432,15 +682,18 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public IndexDescProto getIndexByName(RpcController controller,
-                                         StringProto indexName)
+    public IndexDescProto getIndexByName(RpcController controller, IndexNameProto request)
         throws ServiceException {
+
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String indexName = CatalogUtil.normalizeIdentifier(request.getIndexName());
+
       rlock.lock();
       try {
-        if (!store.existIndex(indexName.getValue())) {
-          throw new NoSuchIndexException(indexName.getValue());
+        if (!store.existIndexByName(databaseName, indexName)) {
+          throw new NoSuchIndexException(databaseName, indexName);
         }
-        return store.getIndex(indexName.getValue());
+        return store.getIndexByName(databaseName, indexName);
       } catch (Exception e) {
         LOG.error("ERROR : cannot get index " + indexName, e);
         return null;
@@ -450,19 +703,22 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public IndexDescProto getIndex(RpcController controller,
-                                   GetIndexRequest request)
+    public IndexDescProto getIndexByColumn(RpcController controller, GetIndexByColumnRequest request)
         throws ServiceException {
+
+      TableIdentifierProto identifier = request.getTableIdentifier();
+      String databaseName = CatalogUtil.normalizeIdentifier(identifier.getDatabaseName());
+      String tableName = CatalogUtil.normalizeIdentifier(identifier.getTableName());
+      String columnName = CatalogUtil.normalizeIdentifier(request.getColumnName());
+
       rlock.lock();
       try {
-        if (!store.existIndex(request.getTableName())) {
-          throw new NoSuchIndexException(request.getTableName() + "."
-              + request.getColumnName());
+        if (!store.existIndexByColumn(databaseName, tableName, columnName)) {
+          throw new NoSuchIndexException(databaseName, columnName);
         }
-        return store.getIndex(request.getTableName(), request.getColumnName());
+        return store.getIndexByColumn(databaseName, tableName, columnName);
       } catch (Exception e) {
-        LOG.error("ERROR : cannot get index " + request.getTableName() + "."
-            + request.getColumnName(), e);
+        LOG.error("ERROR : cannot get index for " + tableName + "." + columnName, e);
         return null;
       } finally {
         rlock.unlock();
@@ -470,14 +726,18 @@ public class CatalogServer extends AbstractService {
     }
 
     @Override
-    public BoolProto delIndex(RpcController controller, StringProto indexName)
+    public BoolProto dropIndex(RpcController controller, IndexNameProto request)
         throws ServiceException {
+
+      String databaseName = CatalogUtil.normalizeIdentifier(request.getDatabaseName());
+      String indexName = CatalogUtil.normalizeIdentifier(request.getIndexName());
+
       wlock.lock();
       try {
-        if (!store.existIndex(indexName.getValue())) {
-          throw new NoSuchIndexException(indexName.getValue());
+        if (!store.existIndexByName(databaseName, indexName)) {
+          throw new NoSuchIndexException(indexName);
         }
-        store.delIndex(indexName.getValue());
+        store.dropIndex(databaseName, indexName);
       } catch (Exception e) {
         LOG.error(e);
       } finally {