You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:38 UTC

[44/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java b/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java
deleted file mode 100644
index d679c69..0000000
--- a/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog.statistics;
-
-import org.junit.Test;
-import tajo.catalog.Column;
-import tajo.common.TajoDataTypes.Type;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestTableStat {
-  @Test
-  public final void testTableStat() throws CloneNotSupportedException {
-    TableStat stat = new TableStat();
-    stat.setNumRows(957685);
-    stat.setNumBytes(1023234);
-    stat.setNumBlocks(3123);
-    stat.setNumPartitions(5);
-    stat.setAvgRows(80000);
-        
-    int numCols = 3;
-    ColumnStat[] cols = new ColumnStat[numCols];
-    for (int i = 0; i < numCols; i++) {
-      cols[i] = new ColumnStat(new Column("col_" + i, Type.INT8));
-      cols[i].setNumDistVals(1024 * i);
-      cols[i].setNumNulls(100 * i);
-      stat.addColumnStat(cols[i]);
-    }
-    
-    assertTrue(957685 == stat.getNumRows());
-    assertTrue(1023234 == stat.getNumBytes());
-    assertTrue(3123 == stat.getNumBlocks());
-    assertTrue(5 == stat.getNumPartitions());
-    assertTrue(80000 == stat.getAvgRows());
-    assertEquals(3, stat.getColumnStats().size());
-    for (int i = 0; i < numCols; i++) {
-      assertEquals(cols[i], stat.getColumnStats().get(i));
-    }
-    
-    TableStat stat2 = new TableStat(stat.getProto());
-    tableStatEquals(stat, stat2);
-    
-    TableStat stat3 = (TableStat) stat.clone();
-    tableStatEquals(stat, stat3);    
-  }
-  
-  public void tableStatEquals(TableStat s1, TableStat s2) {
-    assertEquals(s1.getNumRows(), s2.getNumRows());
-    assertEquals(s1.getNumBlocks(), s2.getNumBlocks());
-    assertEquals(s1.getNumPartitions(), s2.getNumPartitions());
-    assertEquals(s1.getAvgRows(), s2.getAvgRows());
-    assertEquals(s1.getColumnStats().size(), s2.getColumnStats().size());
-    for (int i = 0; i < s1.getColumnStats().size(); i++) {
-      assertEquals(s1.getColumnStats().get(i), s2.getColumnStats().get(i));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
new file mode 100644
index 0000000..1cebd93
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
+import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
+import org.apache.tajo.catalog.store.CatalogStore;
+import org.apache.tajo.catalog.store.DBStore;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This class provides the catalog service. The catalog service enables clients
+ * to register, unregister and access information about tables, functions, and
+ * cluster information.
+ */
+public class CatalogServer extends AbstractService {
+
+  private final static Log LOG = LogFactory.getLog(CatalogServer.class);
+  private TajoConf conf;
+  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private final Lock rlock = lock.readLock();
+  private final Lock wlock = lock.writeLock();
+
+  private CatalogStore store;
+
+  private Map<String, FunctionDescProto> functions = new HashMap<String, FunctionDescProto>();
+
+  // RPC variables
+  private ProtoBlockingRpcServer rpcServer;
+  private InetSocketAddress bindAddress;
+  private String serverName;
+  final CatalogProtocolHandler handler;
+
+  // Server status variables
+  private volatile boolean stopped = false;
+  @SuppressWarnings("unused")
+  private volatile boolean isOnline = false;
+
+  private static BoolProto BOOL_TRUE = BoolProto.newBuilder().
+      setValue(true).build();
+  private static BoolProto BOOL_FALSE = BoolProto.newBuilder().
+      setValue(false).build();
+
+  private List<FunctionDesc> builtingFuncs;
+
+  public CatalogServer() throws IOException {
+    super(CatalogServer.class.getName());
+    this.handler = new CatalogProtocolHandler();
+    this.builtingFuncs = new ArrayList<FunctionDesc>();
+  }
+
+  public CatalogServer(List<FunctionDesc> sqlFuncs) throws IOException {
+    this();
+    this.builtingFuncs = sqlFuncs;
+  }
+
+  @Override
+  public void init(Configuration _conf) {
+    this.conf = (TajoConf) _conf;
+
+    Constructor<?> cons;
+    try {
+      Class<?> storeClass =
+          this.conf.getClass(CatalogConstants.STORE_CLASS, DBStore.class);
+      LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
+
+      cons = storeClass.
+          getConstructor(new Class [] {Configuration.class});
+
+      this.store = (CatalogStore) cons.newInstance(this.conf);
+
+      initBuiltinFunctions(builtingFuncs);
+    } catch (Throwable t) {
+      LOG.error("cannot initialize CatalogServer", t);
+    }
+
+    super.init(conf);
+  }
+
+  private void initBuiltinFunctions(List<FunctionDesc> functions)
+      throws ServiceException {
+    for (FunctionDesc desc : functions) {
+      handler.registerFunction(null, desc.getProto());
+    }
+  }
+
+  public void start() {
+    // Server to handle client requests.
+    String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
+    // Creation of a HSA will force a resolve.
+    InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
+    try {
+      this.rpcServer = new ProtoBlockingRpcServer(
+          CatalogProtocol.class,
+          handler, initIsa);
+      this.rpcServer.start();
+
+      this.bindAddress = this.rpcServer.getBindAddress();
+      this.serverName = org.apache.tajo.util.NetUtils.getIpPortString(bindAddress);
+      conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
+    } catch (Exception e) {
+      LOG.error("Cannot start RPC Server of CatalogServer", e);
+    }
+
+    LOG.info("Catalog Server startup (" + serverName + ")");
+    super.start();
+  }
+
+  public void stop() {
+    this.rpcServer.shutdown();
+    LOG.info("Catalog Server (" + serverName + ") shutdown");
+    super.stop();
+  }
+
+  public CatalogProtocolHandler getHandler() {
+    return this.handler;
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return this.bindAddress;
+  }
+
+  public class CatalogProtocolHandler
+      implements CatalogProtocolService.BlockingInterface {
+
+    @Override
+    public TableDescProto getTableDesc(RpcController controller,
+                                       StringProto name)
+        throws ServiceException {
+      rlock.lock();
+      try {
+        String tableId = name.getValue().toLowerCase();
+        if (!store.existTable(tableId)) {
+          throw new NoSuchTableException(tableId);
+        }
+        return (TableDescProto) store.getTable(tableId).getProto();
+      } catch (IOException ioe) {
+        // TODO - handle exception
+        LOG.error(ioe);
+        return null;
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public GetAllTableNamesResponse getAllTableNames(RpcController controller,
+                                                     NullProto request)
+        throws ServiceException {
+      try {
+        Iterator<String> iterator = store.getAllTableNames().iterator();
+        GetAllTableNamesResponse.Builder builder =
+            GetAllTableNamesResponse.newBuilder();
+        while (iterator.hasNext()) {
+          builder.addTableName(iterator.next());
+        }
+        return builder.build();
+      } catch (IOException ioe) {
+        // TODO - handle exception
+        return null;
+      }
+    }
+
+    @Override
+    public GetFunctionsResponse getFunctions(RpcController controller,
+                                             NullProto request)
+        throws ServiceException {
+      Iterator<FunctionDescProto> iterator = functions.values().iterator();
+      GetFunctionsResponse.Builder builder = GetFunctionsResponse.newBuilder();
+      while (iterator.hasNext()) {
+        builder.addFunctionDesc(iterator.next());
+      }
+      return builder.build();
+    }
+
+    @Override
+    public BoolProto addTable(RpcController controller, TableDescProto tableDesc)
+        throws ServiceException {
+      Preconditions.checkArgument(tableDesc.hasId(),
+          "Must be set to the table name");
+      Preconditions.checkArgument(tableDesc.hasPath(),
+          "Must be set to the table URI");
+
+      wlock.lock();
+      try {
+        if (store.existTable(tableDesc.getId())) {
+          throw new AlreadyExistsTableException(tableDesc.getId());
+        }
+
+        // rewrite schema
+        SchemaProto revisedSchema =
+            CatalogUtil.getQualfiedSchema(tableDesc.getId(), tableDesc.getMeta()
+                .getSchema());
+
+        TableProto.Builder metaBuilder = TableProto.newBuilder(tableDesc.getMeta());
+        metaBuilder.setSchema(revisedSchema);
+        TableDescProto.Builder descBuilder = TableDescProto.newBuilder(tableDesc);
+        descBuilder.setMeta(metaBuilder.build());
+
+        store.addTable(new TableDescImpl(descBuilder.build()));
+
+      } catch (IOException ioe) {
+        LOG.error(ioe);
+      } finally {
+        wlock.unlock();
+        LOG.info("Table " + tableDesc.getId() + " is added to the catalog ("
+            + serverName + ")");
+      }
+
+      return BOOL_TRUE;
+    }
+
+    @Override
+    public BoolProto deleteTable(RpcController controller, StringProto name)
+        throws ServiceException {
+      wlock.lock();
+      try {
+        String tableId = name.getValue().toLowerCase();
+        if (!store.existTable(tableId)) {
+          throw new NoSuchTableException(tableId);
+        }
+        store.deleteTable(tableId);
+      } catch (IOException ioe) {
+        LOG.error(ioe);
+      } finally {
+        wlock.unlock();
+      }
+
+      return BOOL_TRUE;
+    }
+
+    @Override
+    public BoolProto existsTable(RpcController controller, StringProto name)
+        throws ServiceException {
+      try {
+        String tableId = name.getValue().toLowerCase();
+        if (store.existTable(tableId)) {
+          return BOOL_TRUE;
+        } else {
+          return BOOL_FALSE;
+        }
+      } catch (IOException e) {
+        LOG.error(e);
+        throw new ServiceException(e);
+      }
+    }
+
+    @Override
+    public BoolProto addIndex(RpcController controller, IndexDescProto indexDesc)
+        throws ServiceException {
+      rlock.lock();
+      try {
+        if (store.existIndex(indexDesc.getName())) {
+          throw new AlreadyExistsIndexException(indexDesc.getName());
+        }
+        store.addIndex(indexDesc);
+      } catch (IOException ioe) {
+        LOG.error("ERROR : cannot add index " + indexDesc.getName(), ioe);
+        LOG.error(indexDesc);
+        throw new ServiceException(ioe);
+      } finally {
+        rlock.unlock();
+      }
+
+      return BOOL_TRUE;
+    }
+
+    @Override
+    public BoolProto existIndexByName(RpcController controller,
+                                      StringProto indexName)
+        throws ServiceException {
+      rlock.lock();
+      try {
+        return BoolProto.newBuilder().setValue(
+            store.existIndex(indexName.getValue())).build();
+      } catch (IOException e) {
+        LOG.error(e);
+        return BoolProto.newBuilder().setValue(false).build();
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public BoolProto existIndex(RpcController controller,
+                                GetIndexRequest request)
+        throws ServiceException {
+      rlock.lock();
+      try {
+        return BoolProto.newBuilder().setValue(
+            store.existIndex(request.getTableName(),
+                request.getColumnName())).build();
+      } catch (IOException e) {
+        LOG.error(e);
+        return BoolProto.newBuilder().setValue(false).build();
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public IndexDescProto getIndexByName(RpcController controller,
+                                         StringProto indexName)
+        throws ServiceException {
+      rlock.lock();
+      try {
+        if (!store.existIndex(indexName.getValue())) {
+          throw new NoSuchIndexException(indexName.getValue());
+        }
+        return store.getIndex(indexName.getValue());
+      } catch (IOException ioe) {
+        LOG.error("ERROR : cannot get index " + indexName, ioe);
+        return null;
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public IndexDescProto getIndex(RpcController controller,
+                                   GetIndexRequest request)
+        throws ServiceException {
+      rlock.lock();
+      try {
+        if (!store.existIndex(request.getTableName())) {
+          throw new NoSuchIndexException(request.getTableName() + "."
+              + request.getColumnName());
+        }
+        return store.getIndex(request.getTableName(), request.getColumnName());
+      } catch (IOException ioe) {
+        LOG.error("ERROR : cannot get index " + request.getTableName() + "."
+            + request.getColumnName(), ioe);
+        return null;
+      } finally {
+        rlock.unlock();
+      }
+    }
+
+    @Override
+    public BoolProto delIndex(RpcController controller, StringProto indexName)
+        throws ServiceException {
+      wlock.lock();
+      try {
+        if (!store.existIndex(indexName.getValue())) {
+          throw new NoSuchIndexException(indexName.getValue());
+        }
+        store.delIndex(indexName.getValue());
+      } catch (IOException e) {
+        LOG.error(e);
+      } finally {
+        wlock.unlock();
+      }
+
+      return BOOL_TRUE;
+    }
+
+    @Override
+    public BoolProto registerFunction(RpcController controller,
+                                      FunctionDescProto funcDesc)
+        throws ServiceException {
+      String canonicalName =
+          CatalogUtil.getCanonicalName(funcDesc.getSignature(),
+              funcDesc.getParameterTypesList());
+      if (functions.containsKey(canonicalName)) {
+        throw new AlreadyExistsFunctionException(canonicalName);
+      }
+
+      functions.put(canonicalName, funcDesc);
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Function " + canonicalName + " is registered.");
+      }
+
+      return BOOL_TRUE;
+    }
+
+    @Override
+    public BoolProto unregisterFunction(RpcController controller,
+                                        UnregisterFunctionRequest request)
+        throws ServiceException {
+      String signature = request.getSignature();
+      List<DataType> paramTypes = new ArrayList<DataType>();
+      int size = request.getParameterTypesCount();
+      for (int i = 0; i < size; i++) {
+        paramTypes.add(request.getParameterTypes(i));
+      }
+      String canonicalName = CatalogUtil.getCanonicalName(signature, paramTypes);
+      if (!functions.containsKey(canonicalName)) {
+        throw new NoSuchFunctionException(canonicalName);
+      }
+
+      functions.remove(canonicalName);
+      LOG.info("GeneralFunction " + canonicalName + " is unregistered.");
+
+      return BOOL_TRUE;
+    }
+
+    @Override
+    public FunctionDescProto getFunctionMeta(RpcController controller,
+                                             GetFunctionMetaRequest request)
+        throws ServiceException {
+      List<DataType> paramTypes = new ArrayList<DataType>();
+      int size = request.getParameterTypesCount();
+      for (int i = 0; i < size; i++) {
+        paramTypes.add(request.getParameterTypes(i));
+      }
+      return functions.get(CatalogUtil.getCanonicalName(
+          request.getSignature().toLowerCase(), paramTypes));
+    }
+
+    @Override
+    public BoolProto containFunction(RpcController controller,
+                                     ContainFunctionRequest request)
+        throws ServiceException {
+      List<DataType> paramTypes = new ArrayList<DataType>();
+      int size = request.getParameterTypesCount();
+      for (int i = 0; i < size; i++) {
+        paramTypes.add(request.getParameterTypes(i));
+      }
+      boolean returnValue =
+          functions.containsKey(CatalogUtil.getCanonicalName(
+              request.getSignature().toLowerCase(), paramTypes));
+      return BoolProto.newBuilder().setValue(returnValue).build();
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TajoConf conf = new TajoConf();
+    CatalogServer catalog = new CatalogServer(new ArrayList<FunctionDesc>());
+    catalog.init(conf);
+    catalog.start();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java
new file mode 100644
index 0000000..3a98ca0
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+
+/**
+ * This class provides a catalog service interface in
+ * local.
+ */
+public class LocalCatalog extends AbstractCatalogClient {
+  private static final Log LOG = LogFactory.getLog(LocalCatalog.class);
+  private CatalogServer catalog;
+
+  public LocalCatalog(final TajoConf conf) throws IOException {
+    this.catalog = new CatalogServer();
+    this.catalog.init(conf);
+    this.catalog.start();
+    setStub(catalog.getHandler());
+  }
+
+  public LocalCatalog(final CatalogServer server) {
+    this.catalog = server;
+    setStub(server.getHandler());
+  }
+
+  public void shutdown() {
+    this.catalog.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java
new file mode 100644
index 0000000..a31f3c0
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+
+public class MiniCatalogServer {
+  private CatalogServer catalogServers;
+  
+  public MiniCatalogServer(TajoConf conf) throws IOException {
+    catalogServers = new CatalogServer();
+    catalogServers.init(conf);
+    catalogServers.start();
+  }
+  
+  public void shutdown() {
+    this.catalogServers.stop();
+  }
+  
+  public CatalogServer getCatalogServer() {
+    return this.catalogServers;
+  }
+  
+  public CatalogService getCatalog() {
+    return new LocalCatalog(this.catalogServers);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
new file mode 100644
index 0000000..44f4e7a
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store;
+
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public interface CatalogStore extends Closeable {
+  void addTable(TableDesc desc) throws IOException;
+  
+  boolean existTable(String name) throws IOException;
+  
+  void deleteTable(String name) throws IOException;
+  
+  TableDesc getTable(String name) throws IOException;
+  
+  List<String> getAllTableNames() throws IOException;
+  
+  void addIndex(IndexDescProto proto) throws IOException;
+  
+  void delIndex(String indexName) throws IOException;
+  
+  IndexDescProto getIndex(String indexName) throws IOException;
+  
+  IndexDescProto getIndex(String tableName, String columnName) 
+      throws IOException;
+  
+  boolean existIndex(String indexName) throws IOException;
+  
+  boolean existIndex(String tableName, String columnName) throws IOException;
+  
+  IndexDescProto [] getIndexes(String tableName) throws IOException;
+  
+  void addFunction(FunctionDesc func) throws IOException;
+  
+  void deleteFunction(FunctionDesc func) throws IOException;
+  
+  void existFunction(FunctionDesc func) throws IOException;
+  
+  List<String> getAllFunctionNames() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
new file mode 100644
index 0000000..41b2ca1
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
@@ -0,0 +1,1033 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DBStore implements CatalogStore {
+  private final Log LOG = LogFactory.getLog(DBStore.class); 
+  private final Configuration conf;
+  private final String driver;
+  private final String jdbcUri;
+  private Connection conn;  
+
+  private static final int VERSION = 1;
+
+  private static final String TB_META = "META";
+  private static final String TB_TABLES = "TABLES";
+  private static final String TB_COLUMNS = "COLUMNS";
+  private static final String TB_OPTIONS = "OPTIONS";
+  private static final String TB_INDEXES = "INDEXES";
+  private static final String TB_STATISTICS = "STATS";
+  
+  private static final String C_TABLE_ID = "TABLE_ID";
+  
+  private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+  private Lock rlock = lock.readLock();
+  private Lock wlock = lock.writeLock();
+  
+  public DBStore(final Configuration conf)
+      throws InternalException {
+    this.conf = conf;
+    
+    this.driver =
+        this.conf.get(CatalogConstants.JDBC_DRIVER, CatalogConstants.DEFAULT_JDBC_DRIVER);
+    this.jdbcUri =
+        this.conf.get(CatalogConstants.JDBC_URI);
+
+    try {
+      Class.forName(driver).newInstance();
+      LOG.info("Loaded the JDBC driver (" + driver +")");
+    } catch (Exception e) {
+      throw new InternalException("Cannot load JDBC driver " + driver, e);
+    }
+
+    try {
+      LOG.info("Trying to connect database (" + jdbcUri + ")");
+      conn = DriverManager.getConnection(jdbcUri + ";create=true");
+      LOG.info("Connected to database (" + jdbcUri +")");
+    } catch (SQLException e) {
+      throw new InternalException("Cannot connect to database (" + jdbcUri
+          +")", e);
+    }
+
+    try {
+      if (!isInitialized()) {
+        LOG.info("The base tables of CatalogServer are created.");
+        createBaseTable();
+      } else {
+        LOG.info("The base tables of CatalogServer already is initialized.");
+      }
+    } catch (SQLException se) {
+      throw new InternalException(
+          "Cannot initialize the persistent storage of Catalog", se);
+    }
+
+    int dbVersion = 0;
+    try {
+      dbVersion = needUpgrade();
+    } catch (SQLException e) {
+      throw new InternalException(
+          "Cannot check if the DB need to be upgraded", e);
+    }
+
+//    if (dbVersion < VERSION) {
+//      LOG.info("DB Upgrade is needed");
+//      try {
+//        upgrade(dbVersion, VERSION);
+//      } catch (SQLException e) {
+//        LOG.error(e.getMessage());
+//        throw new InternalException("DB upgrade is failed.", e);
+//      }
+//    }
+  }
+
+  private int needUpgrade() throws SQLException {
+    String sql = "SELECT VERSION FROM " + TB_META;
+    Statement stmt = conn.createStatement();
+    ResultSet res = stmt.executeQuery(sql);
+
+    if (res.next() == false) { // if this db version is 0
+      insertVersion();
+      return 0;
+    } else {
+      return res.getInt(1);
+    }
+  }
+
+  private void insertVersion() throws SQLException {
+    String sql = "INSERT INTO " + TB_META + " values (0)";
+    Statement stmt = conn.createStatement();
+    stmt.executeUpdate(sql);
+    stmt.close();
+  }
+
+  private void upgrade(int from, int to) throws SQLException {
+    String sql;
+    Statement stmt;
+    if (from == 0 ) {
+      if (to == 1) {
+        sql = "DROP INDEX idx_options_key";
+        LOG.info(sql);
+
+        stmt = conn.createStatement();
+        stmt.addBatch(sql);
+
+        sql =
+            "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+        stmt.addBatch(sql);
+        LOG.info(sql);
+        stmt.executeBatch();
+        stmt.close();
+
+        LOG.info("DB Upgraded from " + from + " to " + to);
+      } else {
+        LOG.info("DB Upgraded from " + from + " to " + to);
+      }
+    }
+  }
+
+  // TODO - DDL and index statements should be renamed
+  private void createBaseTable() throws SQLException {
+    wlock.lock();
+    try {
+      // META
+      Statement stmt = conn.createStatement();
+      String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(meta_ddl);
+      }
+      stmt.executeUpdate(meta_ddl);
+      LOG.info("Table '" + TB_META + " is created.");
+
+      // TABLES
+      stmt = conn.createStatement();
+      String tables_ddl = "CREATE TABLE "
+          + TB_TABLES + " ("
+          + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+          + C_TABLE_ID + " VARCHAR(256) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
+          + "path VARCHAR(1024), "
+          + "store_type CHAR(16), "
+          + "options VARCHAR(32672), "
+          + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
+          ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(tables_ddl);
+      }
+      stmt.addBatch(tables_ddl);
+      String idx_tables_tid = 
+          "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_tables_tid);
+      }
+      stmt.addBatch(idx_tables_tid);
+      
+      String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on " 
+          + TB_TABLES + "(" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_tables_name);
+      }
+      stmt.addBatch(idx_tables_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_TABLES + "' is created.");
+
+      // COLUMNS
+      stmt = conn.createStatement();
+      String columns_ddl = 
+          "CREATE TABLE " + TB_COLUMNS + " ("
+          + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+          + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES " + TB_TABLES + "("
+          + C_TABLE_ID + ") ON DELETE CASCADE, "
+          + "column_id INT NOT NULL,"
+          + "column_name VARCHAR(256) NOT NULL, " + "data_type CHAR(16), "
+          + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(columns_ddl);
+      }
+      stmt.addBatch(columns_ddl);
+
+      String idx_fk_columns_table_name = 
+          "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
+          + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_fk_columns_table_name);
+      }
+      stmt.addBatch(idx_fk_columns_table_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_COLUMNS + " is created.");
+
+      // OPTIONS
+      stmt = conn.createStatement();
+      String options_ddl = 
+          "CREATE TABLE " + TB_OPTIONS +" ("
+          + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
+          + "ON DELETE CASCADE, "
+          + "key_ VARCHAR(256) NOT NULL, value_ VARCHAR(256) NOT NULL)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(options_ddl);
+      }
+      stmt.addBatch(options_ddl);
+      
+      String idx_options_key = 
+          "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_options_key);
+      }
+      stmt.addBatch(idx_options_key);
+      String idx_options_table_name = 
+          "CREATE INDEX idx_options_table_name on " + TB_OPTIONS 
+          + "(" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_options_table_name);
+      }
+      stmt.addBatch(idx_options_table_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_OPTIONS + " is created.");
+      
+      // INDEXES
+      stmt = conn.createStatement();
+      String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
+          + "index_name VARCHAR(256) NOT NULL PRIMARY KEY, "
+          + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+          + "ON DELETE CASCADE, "
+          + "column_name VARCHAR(256) NOT NULL, "
+          + "data_type VARCHAR(256) NOT NULL, "
+          + "index_type CHAR(32) NOT NULL, "
+          + "is_unique BOOLEAN NOT NULL, "
+          + "is_clustered BOOLEAN NOT NULL, "
+          + "is_ascending BOOLEAN NOT NULL)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(indexes_ddl);
+      }
+      stmt.addBatch(indexes_ddl);
+      
+      String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON " 
+          + TB_INDEXES + " (index_name)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_indexes_key);
+      }      
+      stmt.addBatch(idx_indexes_key);
+      
+      String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON " 
+          + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_indexes_columns);
+      } 
+      stmt.addBatch(idx_indexes_columns);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_INDEXES + "' is created.");
+
+      String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+          + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+          + "ON DELETE CASCADE, "
+          + "num_rows BIGINT, "
+          + "num_bytes BIGINT)";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stats_ddl);
+      }
+      stmt.addBatch(stats_ddl);
+
+      String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
+          + TB_STATISTICS + " (" + C_TABLE_ID + ")";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(idx_stats_fk_table_name);
+      }
+      stmt.addBatch(idx_stats_fk_table_name);
+      stmt.executeBatch();
+      LOG.info("Table '" + TB_STATISTICS + "' is created.");
+
+    } finally {
+      wlock.unlock();
+    }
+  }
+  
+  private boolean isInitialized() throws SQLException {
+    wlock.lock();
+    try {
+      boolean found = false;
+      ResultSet res = conn.getMetaData().getTables(null, null, null, 
+          new String [] {"TABLE"});
+      
+      String resName;
+      while (res.next() && !found) {
+        resName = res.getString("TABLE_NAME");
+        if (TB_META.equals(resName)
+            || TB_TABLES.equals(resName)
+            || TB_COLUMNS.equals(resName)
+            || TB_OPTIONS.equals(resName)) {
+            return true;
+        }
+      }
+    } finally {
+      wlock.unlock();
+    }    
+    return false;
+  }
+  
+  final boolean checkInternalTable(final String tableName) throws SQLException {
+    rlock.lock();
+    try {
+      boolean found = false;
+      ResultSet res = conn.getMetaData().getTables(null, null, null, 
+              new String [] {"TABLE"});
+      while(res.next() && !found) {
+        if (tableName.equals(res.getString("TABLE_NAME")))
+          found = true;
+      }
+      
+      return found;
+    } finally {
+      rlock.unlock();
+    }
+  }
+  
+  @Override
+  public final void addTable(final TableDesc table) throws IOException {
+    Statement stmt = null;
+    ResultSet res;
+
+    String sql = 
+        "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
+        + "VALUES('" + table.getId() + "', "
+        + "'" + table.getPath() + "', "
+        + "'" + table.getMeta().getStoreType() + "'"
+        + ")";
+
+    wlock.lock();
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      stmt.addBatch(sql);
+      stmt.executeBatch();
+      
+      stmt = conn.createStatement();
+      sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID 
+          + " = '" + table.getId() + "'";
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no tid matched to " 
+            + table.getId());
+      }
+      int tid = res.getInt("TID");
+
+      String colSql;
+      int columnId = 0;
+      for (Column col : table.getMeta().getSchema().getColumns()) {
+        colSql = columnToSQL(tid, table, columnId, col);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(colSql);
+        }
+        stmt.addBatch(colSql);
+        columnId++;
+      }
+      
+      Iterator<Entry<String,String>> it = table.getMeta().getOptions();
+      String optSql;
+      while (it.hasNext()) {
+        optSql = keyvalToSQL(table, it.next());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(optSql);
+        }
+        stmt.addBatch(optSql);
+      }
+      stmt.executeBatch();
+      if (table.getMeta().getStat() != null) {
+        sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
+            + "VALUES ('" + table.getId() + "', "
+            + table.getMeta().getStat().getNumRows() + ","
+            + table.getMeta().getStat().getNumBytes() + ")";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.addBatch(sql);
+        stmt.executeBatch();
+      }
+    } catch (SQLException se) {
+      throw new IOException(se.getMessage(), se);
+    } finally {
+      wlock.unlock();
+      try {
+        stmt.close();
+      } catch (SQLException e) {
+      }      
+    }
+  }
+  
+  private String columnToSQL(final int tid, final TableDesc desc, 
+      final int columnId, final Column col) {
+    String sql =
+        "INSERT INTO " + TB_COLUMNS 
+        + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type) "
+        + "VALUES("
+        + tid + ","
+        + "'" + desc.getId() + "',"
+        + columnId + ", "
+        + "'" + col.getColumnName() + "',"
+        + "'" + col.getDataType().getType().name() + "'"
+        + ")";
+    
+    return sql;
+  }
+  
+  private String keyvalToSQL(final TableDesc desc,
+      final Entry<String,String> keyVal) {
+    String sql =
+        "INSERT INTO " + TB_OPTIONS 
+        + " (" + C_TABLE_ID + ", key_, value_) "
+        + "VALUES("
+        + "'" + desc.getId() + "',"
+        + "'" + keyVal.getKey() + "',"
+        + "'" + keyVal.getValue() + "'"
+        + ")";
+    
+    return sql;
+  }
+  
+  @Override
+  public final boolean existTable(final String name) throws IOException {
+    StringBuilder sql = new StringBuilder();
+    sql.append("SELECT " + C_TABLE_ID + " from ")
+    .append(TB_TABLES)
+    .append(" WHERE " + C_TABLE_ID + " = '")
+    .append(name)
+    .append("'");
+
+    Statement stmt = null;
+    boolean exist = false;
+    rlock.lock();
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql.toString());
+      }
+      ResultSet res = stmt.executeQuery(sql.toString());
+      exist = res.next();
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      rlock.unlock();
+      try {
+        if (stmt != null) {
+          stmt.close();
+        }
+      } catch (SQLException e) {
+      }         
+    }
+    
+    return exist;
+  }
+
+  @Override
+  public final void deleteTable(final String name) throws IOException {
+    Statement stmt = null;
+    String sql = null;
+    try {
+      wlock.lock();
+      try {
+        stmt = conn.createStatement();
+        sql = "DELETE FROM " + TB_COLUMNS +
+            " WHERE " + C_TABLE_ID + " = '" + name + "'";
+        LOG.info(sql);
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        try {
+          if (stmt != null) {
+            stmt.close();
+          }
+        } catch (SQLException e) {
+        }
+      }
+
+      try {
+        sql = "DELETE FROM " + TB_OPTIONS +
+            " WHERE " + C_TABLE_ID + " = '" + name + "'";
+        LOG.info(sql);
+        stmt = conn.createStatement();
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        try {
+          if (stmt != null) {
+            stmt.close();
+          }
+        } catch (SQLException e) {
+        }
+      }
+
+      try {
+        sql = "DELETE FROM " + TB_STATISTICS +
+            " WHERE " + C_TABLE_ID + " = '" + name + "'";
+        LOG.info(sql);
+        stmt = conn.createStatement();
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        try {
+          if (stmt != null) {
+            stmt.close();
+          }
+        } catch (SQLException e) {
+        }
+      }
+
+      try {
+        sql = "DELETE FROM " + TB_TABLES +
+            " WHERE " + C_TABLE_ID +" = '" + name + "'";
+        LOG.info(sql);
+        stmt = conn.createStatement();
+        stmt.execute(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        try {
+          if (stmt != null) {
+            stmt.close();
+          }
+        } catch (SQLException e) {
+        }
+      }
+
+    } finally {
+      wlock.unlock();
+    }
+  }
+
+  @Override
+  public final TableDesc getTable(final String name) throws IOException {
+    ResultSet res = null;
+    Statement stmt = null;
+
+    String tableName = null;
+    Path path = null;
+    StoreType storeType = null;
+    Options options;
+    TableStat stat = null;
+
+    rlock.lock();
+    try {
+      try {
+        String sql = 
+            "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+            + " WHERE " + C_TABLE_ID + "='" + name + "'";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt = conn.createStatement();
+        res = stmt.executeQuery(sql);
+        if (!res.next()) { // there is no table of the given name.
+          return null;
+        }
+        tableName = res.getString(C_TABLE_ID).trim();
+        path = new Path(res.getString("path").trim());
+        storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+      } catch (SQLException se) { 
+        throw new IOException(se);
+      } finally {
+        stmt.close();
+        res.close();
+      }
+      
+      Schema schema = null;
+      try {
+        String sql = "SELECT column_name, data_type from " + TB_COLUMNS
+            + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
+
+        stmt = conn.createStatement();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        res = stmt.executeQuery(sql);
+
+        schema = new Schema();
+        while (res.next()) {
+          String columnName = tableName + "." 
+              + res.getString("column_name").trim();
+          Type dataType = getDataType(res.getString("data_type")
+              .trim());
+          schema.addColumn(columnName, dataType);
+        }
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        stmt.close();
+        res.close();
+      }
+      
+      options = Options.create();
+      try {
+        String sql = "SELECT key_, value_ from " + TB_OPTIONS
+            + " WHERE " + C_TABLE_ID + "='" + name + "'";
+        stmt = conn.createStatement();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        res = stmt.executeQuery(sql);        
+        
+        while (res.next()) {
+          options.put(
+              res.getString("key_"),
+              res.getString("value_"));          
+        }
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        stmt.close();
+        res.close();
+      }
+
+      try {
+        String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
+            + " WHERE " + C_TABLE_ID + "='" + name + "'";
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt = conn.createStatement();
+        res = stmt.executeQuery(sql);
+
+        if (res.next()) {
+          stat = new TableStat();
+          stat.setNumRows(res.getLong("num_rows"));
+          stat.setNumBytes(res.getLong("num_bytes"));
+        }
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        stmt.close();
+        res.close();
+      }
+
+      TableMeta meta = new TableMetaImpl(schema, storeType, options);
+      if (stat != null) {
+        meta.setStat(stat);
+      }
+      TableDesc table = new TableDescImpl(tableName, meta, path);
+
+      return table;
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      rlock.unlock();
+    }
+  }
+  
+  private Type getDataType(final String typeStr) {
+    try {
+    return Enum.valueOf(Type.class, typeStr);
+    } catch (IllegalArgumentException iae) {
+      LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
+      return null;
+    }
+  }
+  
+  @Override
+  public final List<String> getAllTableNames() throws IOException {
+    String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
+    
+    Statement stmt = null;
+    ResultSet res;
+    
+    List<String> tables = new ArrayList<String>();
+    rlock.lock();
+    try {
+      stmt = conn.createStatement();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery(sql);
+      while (res.next()) {
+        tables.add(res.getString(C_TABLE_ID).trim());
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      rlock.unlock();
+      try {
+        if (stmt != null) {
+          stmt.close();
+        }
+      } catch (SQLException e) {
+      }
+    }
+    return tables;
+  }
+  
+  public final void addIndex(final IndexDescProto proto) throws IOException {
+    String sql = 
+        "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, " 
+        +"data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
+        +"(?,?,?,?,?,?,?,?)";
+    
+    PreparedStatement stmt = null;
+
+    wlock.lock();
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, proto.getName());
+      stmt.setString(2, proto.getTableId());
+      stmt.setString(3, proto.getColumn().getColumnName());
+      stmt.setString(4, proto.getColumn().getDataType().getType().name());
+      stmt.setString(5, proto.getIndexMethod().toString());
+      stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
+      stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
+      stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
+      stmt.executeUpdate();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+    } catch (SQLException se) {
+      throw new IOException(se);
+    } finally {
+      wlock.unlock();
+      try {
+        if (stmt != null) {
+          stmt.close();
+        }
+      } catch (SQLException e) {     
+      }
+    }
+  }
+  
+  public final void delIndex(final String indexName) throws IOException {
+    String sql =
+        "DELETE FROM " + TB_INDEXES
+        + " WHERE index_name='" + indexName + "'";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+      
+      Statement stmt = null;
+      wlock.lock(); 
+      try {
+        stmt = conn.createStatement();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(sql);
+        }
+        stmt.executeUpdate(sql);
+      } catch (SQLException se) {
+        throw new IOException(se);
+      } finally {
+        wlock.unlock();
+        try {
+          if (stmt != null) {
+            stmt.close();
+          }
+        } catch (SQLException e) {
+        }      
+      }
+  }
+  
+  public final IndexDescProto getIndex(final String indexName) 
+      throws IOException {
+    ResultSet res;
+    PreparedStatement stmt;
+    
+    IndexDescProto proto = null;
+    
+    rlock.lock();
+    try {
+      String sql = 
+          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
+          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+          + "where index_name = ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, indexName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(stmt.toString());
+      }
+      res = stmt.executeQuery();
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no index matched to " + indexName);
+      }
+      proto = resultToProto(res);
+    } catch (SQLException se) {
+    } finally {
+      rlock.unlock();
+    }
+    
+    return proto;
+  }
+  
+  public final IndexDescProto getIndex(final String tableName, 
+      final String columnName) throws IOException {
+    ResultSet res;
+    PreparedStatement stmt;
+    
+    IndexDescProto proto = null;
+    
+    rlock.lock();
+    try {
+      String sql = 
+          "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
+          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+          + "where " + C_TABLE_ID + " = ? AND column_name = ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      stmt.setString(2, columnName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();
+      if (!res.next()) {
+        throw new IOException("ERROR: there is no index matched to " 
+            + tableName + "." + columnName);
+      }      
+      proto = resultToProto(res);
+    } catch (SQLException se) {
+      new IOException(se);
+    } finally {
+      rlock.unlock();
+    }
+    
+    return proto;
+  }
+  
+  public final boolean existIndex(final String indexName) throws IOException {
+    String sql = "SELECT index_name from " + TB_INDEXES 
+        + " WHERE index_name = ?";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    PreparedStatement stmt = null;
+    boolean exist = false;
+    rlock.lock();
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, indexName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      ResultSet res = stmt.executeQuery();
+      exist = res.next();
+    } catch (SQLException se) {
+      
+    } finally {
+      rlock.unlock();
+      try {
+        stmt.close();
+      } catch (SQLException e) {
+      }         
+    }
+    
+    return exist;
+  }
+  
+  @Override
+  public boolean existIndex(String tableName, String columnName)
+      throws IOException {
+    String sql = "SELECT index_name from " + TB_INDEXES 
+        + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(sql);
+    }
+
+    PreparedStatement stmt = null;
+    boolean exist = false;
+    rlock.lock();
+    try {
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      stmt.setString(2, columnName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      ResultSet res = stmt.executeQuery();
+      exist = res.next();
+    } catch (SQLException se) {
+      
+    } finally {
+      rlock.unlock();
+      try {
+        stmt.close();
+      } catch (SQLException e) {
+      }         
+    }
+    
+    return exist;
+  }
+  
+  public final IndexDescProto [] getIndexes(final String tableName) 
+      throws IOException {
+    ResultSet res;
+    PreparedStatement stmt;
+    
+    List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+    
+    rlock.lock();
+    try {
+      String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, " 
+          + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+          + "where " + C_TABLE_ID + "= ?";
+      stmt = conn.prepareStatement(sql);
+      stmt.setString(1, tableName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(sql);
+      }
+      res = stmt.executeQuery();      
+      while (res.next()) {
+        protos.add(resultToProto(res));
+      }
+    } catch (SQLException se) {
+    } finally {
+      rlock.unlock();
+    }
+    
+    return protos.toArray(new IndexDescProto [protos.size()]);
+  }
+  
+  private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+    IndexDescProto.Builder builder = IndexDescProto.newBuilder();
+    builder.setName(res.getString("index_name"));
+    builder.setTableId(res.getString(C_TABLE_ID));
+    builder.setColumn(resultToColumnProto(res));
+    builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
+    builder.setIsUnique(res.getBoolean("is_unique"));
+    builder.setIsClustered(res.getBoolean("is_clustered"));
+    builder.setIsAscending(res.getBoolean("is_ascending"));
+    return builder.build();
+  }
+  
+  private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
+    ColumnProto.Builder builder = ColumnProto.newBuilder();
+    builder.setColumnName(res.getString("column_name"));
+    builder.setDataType(CatalogUtil.newDataTypeWithoutLen(
+        getDataType(res.getString("data_type").trim())));
+    return builder.build();
+  }
+  
+  private IndexMethod getIndexMethod(final String typeStr) {
+    if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
+      return IndexMethod.TWO_LEVEL_BIN_TREE;
+    } else {
+      LOG.error("Cannot find a matched type aginst from '"
+          + typeStr + "'");
+      // TODO - needs exception handling
+      return null;
+    }
+  }
+  
+  @Override
+  public final void addFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final void deleteFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final void existFunction(final FunctionDesc func) throws IOException {
+    // TODO - not implemented yet    
+  }
+
+  @Override
+  public final List<String> getAllFunctionNames() throws IOException {
+    // TODO - not implemented yet
+    return null;
+  }
+
+  @Override
+  public final void close() {
+    try {
+      DriverManager.getConnection("jdbc:derby:;shutdown=true");
+    } catch (SQLException e) {
+      // TODO - to be fixed
+      //LOG.error(e.getMessage(), e);
+    }
+    
+    LOG.info("Shutdown database (" + jdbcUri + ")");
+  }
+  
+  
+  public static void main(final String[] args) throws IOException {
+    @SuppressWarnings("unused")
+    DBStore store = new DBStore(new TajoConf());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
new file mode 100644
index 0000000..4aacefd
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * 
+ */
+package org.apache.tajo.catalog.store;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MemStore implements CatalogStore {
+  private final Map<String,TableDesc> tables
+    = Maps.newHashMap();
+  private final Map<String, FunctionDesc> functions
+    = Maps.newHashMap();
+  private final Map<String, IndexDescProto> indexes
+    = Maps.newHashMap();
+  private final Map<String, IndexDescProto> indexesByColumn
+  = Maps.newHashMap();
+  
+  public MemStore(Configuration conf) {
+  }
+
+  /* (non-Javadoc)
+   * @see java.io.Closeable#close()
+   */
+  @Override
+  public void close() throws IOException {
+    tables.clear();
+    functions.clear();
+    indexes.clear();
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#addTable(TableDesc)
+   */
+  @Override
+  public void addTable(TableDesc desc) throws IOException {
+    synchronized(tables) {
+      tables.put(desc.getId(), desc);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#existTable(java.lang.String)
+   */
+  @Override
+  public boolean existTable(String name) throws IOException {
+    synchronized(tables) {
+      return tables.containsKey(name);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#deleteTable(java.lang.String)
+   */
+  @Override
+  public void deleteTable(String name) throws IOException {
+    synchronized(tables) {
+      tables.remove(name);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#getTable(java.lang.String)
+   */
+  @Override
+  public TableDesc getTable(String name) throws IOException {
+    return tables.get(name);
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#getAllTableNames()
+   */
+  @Override
+  public List<String> getAllTableNames() throws IOException {
+    return new ArrayList<String>(tables.keySet());
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#addIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
+   */
+  @Override
+  public void addIndex(IndexDescProto proto) throws IOException {
+    synchronized(indexes) {
+      indexes.put(proto.getName(), proto);
+      indexesByColumn.put(proto.getTableId() + "." 
+          + proto.getColumn().getColumnName(), proto);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#delIndex(java.lang.String)
+   */
+  @Override
+  public void delIndex(String indexName) throws IOException {
+    synchronized(indexes) {
+      indexes.remove(indexName);
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#getIndex(java.lang.String)
+   */
+  @Override
+  public IndexDescProto getIndex(String indexName) throws IOException {
+    return indexes.get(indexName);
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#getIndex(java.lang.String, java.lang.String)
+   */
+  @Override
+  public IndexDescProto getIndex(String tableName, String columnName)
+      throws IOException {
+    return indexesByColumn.get(tableName+"."+columnName);
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#existIndex(java.lang.String)
+   */
+  @Override
+  public boolean existIndex(String indexName) throws IOException {
+    return indexes.containsKey(indexName);
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#existIndex(java.lang.String, java.lang.String)
+   */
+  @Override
+  public boolean existIndex(String tableName, String columnName)
+      throws IOException {
+    return indexesByColumn.containsKey(tableName + "." + columnName);
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#getIndexes(java.lang.String)
+   */
+  @Override
+  public IndexDescProto[] getIndexes(String tableName) throws IOException {
+    List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+    for (IndexDescProto proto : indexesByColumn.values()) {
+      if (proto.getTableId().equals(tableName)) {
+        protos.add(proto);
+      }
+    }
+    return protos.toArray(new IndexDescProto[protos.size()]);
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#addFunction(FunctionDesc)
+   */
+  @Override
+  public void addFunction(FunctionDesc func) throws IOException {
+    // to be implemented
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#deleteFunction(FunctionDesc)
+   */
+  @Override
+  public void deleteFunction(FunctionDesc func) throws IOException {
+    // to be implemented
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#existFunction(FunctionDesc)
+   */
+  @Override
+  public void existFunction(FunctionDesc func) throws IOException {
+    // to be implemented
+  }
+
+  /* (non-Javadoc)
+   * @see CatalogStore#getAllFunctionNames()
+   */
+  @Override
+  public List<String> getAllFunctionNames() throws IOException {
+    // to be implemented
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java
deleted file mode 100644
index 8587b1e..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.service.AbstractService;
-import tajo.catalog.CatalogProtocol.CatalogProtocolService;
-import tajo.catalog.exception.*;
-import tajo.catalog.proto.CatalogProtos.*;
-import tajo.catalog.store.CatalogStore;
-import tajo.catalog.store.DBStore;
-import tajo.common.TajoDataTypes.DataType;
-import tajo.conf.TajoConf;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.rpc.ProtoBlockingRpcServer;
-import tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import tajo.util.NetUtils;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * This class provides the catalog service. The catalog service enables clients
- * to register, unregister and access information about tables, functions, and
- * cluster information.
- */
-public class CatalogServer extends AbstractService {
-
-  private final static Log LOG = LogFactory.getLog(CatalogServer.class);
-  private TajoConf conf;
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final Lock rlock = lock.readLock();
-  private final Lock wlock = lock.writeLock();
-
-  private CatalogStore store;
-
-  private Map<String, FunctionDescProto> functions = new HashMap<String, FunctionDescProto>();
-
-  // RPC variables
-  private ProtoBlockingRpcServer rpcServer;
-  private InetSocketAddress bindAddress;
-  private String serverName;
-  final CatalogProtocolHandler handler;
-
-  // Server status variables
-  private volatile boolean stopped = false;
-  @SuppressWarnings("unused")
-  private volatile boolean isOnline = false;
-
-  private static BoolProto BOOL_TRUE = BoolProto.newBuilder().
-      setValue(true).build();
-  private static BoolProto BOOL_FALSE = BoolProto.newBuilder().
-      setValue(false).build();
-
-  private List<FunctionDesc> builtingFuncs;
-
-  public CatalogServer() throws IOException {
-    super(CatalogServer.class.getName());
-    this.handler = new CatalogProtocolHandler();
-    this.builtingFuncs = new ArrayList<FunctionDesc>();
-  }
-
-  public CatalogServer(List<FunctionDesc> sqlFuncs) throws IOException {
-    this();
-    this.builtingFuncs = sqlFuncs;
-  }
-
-  @Override
-  public void init(Configuration _conf) {
-    this.conf = (TajoConf) _conf;
-
-    Constructor<?> cons;
-    try {
-      Class<?> storeClass =
-          this.conf.getClass(CatalogConstants.STORE_CLASS, DBStore.class);
-      LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
-
-      cons = storeClass.
-          getConstructor(new Class [] {Configuration.class});
-
-      this.store = (CatalogStore) cons.newInstance(this.conf);
-
-      initBuiltinFunctions(builtingFuncs);
-    } catch (Throwable t) {
-      LOG.error("cannot initialize CatalogServer", t);
-    }
-
-    super.init(conf);
-  }
-
-  private void initBuiltinFunctions(List<FunctionDesc> functions)
-      throws ServiceException {
-    for (FunctionDesc desc : functions) {
-      handler.registerFunction(null, desc.getProto());
-    }
-  }
-
-  public void start() {
-    // Server to handle client requests.
-    String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
-    // Creation of a HSA will force a resolve.
-    InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
-    try {
-      this.rpcServer = new ProtoBlockingRpcServer(
-          CatalogProtocol.class,
-          handler, initIsa);
-      this.rpcServer.start();
-
-      this.bindAddress = this.rpcServer.getBindAddress();
-      this.serverName = tajo.util.NetUtils.getIpPortString(bindAddress);
-      conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
-    } catch (Exception e) {
-      LOG.error("Cannot start RPC Server of CatalogServer", e);
-    }
-
-    LOG.info("Catalog Server startup (" + serverName + ")");
-    super.start();
-  }
-
-  public void stop() {
-    this.rpcServer.shutdown();
-    LOG.info("Catalog Server (" + serverName + ") shutdown");
-    super.stop();
-  }
-
-  public CatalogProtocolHandler getHandler() {
-    return this.handler;
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddress;
-  }
-
-  public class CatalogProtocolHandler
-      implements CatalogProtocolService.BlockingInterface {
-
-    @Override
-    public TableDescProto getTableDesc(RpcController controller,
-                                       StringProto name)
-        throws ServiceException {
-      rlock.lock();
-      try {
-        String tableId = name.getValue().toLowerCase();
-        if (!store.existTable(tableId)) {
-          throw new NoSuchTableException(tableId);
-        }
-        return (TableDescProto) store.getTable(tableId).getProto();
-      } catch (IOException ioe) {
-        // TODO - handle exception
-        LOG.error(ioe);
-        return null;
-      } finally {
-        rlock.unlock();
-      }
-    }
-
-    @Override
-    public GetAllTableNamesResponse getAllTableNames(RpcController controller,
-                                                     NullProto request)
-        throws ServiceException {
-      try {
-        Iterator<String> iterator = store.getAllTableNames().iterator();
-        GetAllTableNamesResponse.Builder builder =
-            GetAllTableNamesResponse.newBuilder();
-        while (iterator.hasNext()) {
-          builder.addTableName(iterator.next());
-        }
-        return builder.build();
-      } catch (IOException ioe) {
-        // TODO - handle exception
-        return null;
-      }
-    }
-
-    @Override
-    public GetFunctionsResponse getFunctions(RpcController controller,
-                                             NullProto request)
-        throws ServiceException {
-      Iterator<FunctionDescProto> iterator = functions.values().iterator();
-      GetFunctionsResponse.Builder builder = GetFunctionsResponse.newBuilder();
-      while (iterator.hasNext()) {
-        builder.addFunctionDesc(iterator.next());
-      }
-      return builder.build();
-    }
-
-    @Override
-    public BoolProto addTable(RpcController controller, TableDescProto tableDesc)
-        throws ServiceException {
-      Preconditions.checkArgument(tableDesc.hasId(),
-          "Must be set to the table name");
-      Preconditions.checkArgument(tableDesc.hasPath(),
-          "Must be set to the table URI");
-
-      wlock.lock();
-      try {
-        if (store.existTable(tableDesc.getId())) {
-          throw new AlreadyExistsTableException(tableDesc.getId());
-        }
-
-        // rewrite schema
-        SchemaProto revisedSchema =
-            CatalogUtil.getQualfiedSchema(tableDesc.getId(), tableDesc.getMeta()
-                .getSchema());
-
-        TableProto.Builder metaBuilder = TableProto.newBuilder(tableDesc.getMeta());
-        metaBuilder.setSchema(revisedSchema);
-        TableDescProto.Builder descBuilder = TableDescProto.newBuilder(tableDesc);
-        descBuilder.setMeta(metaBuilder.build());
-
-        store.addTable(new TableDescImpl(descBuilder.build()));
-
-      } catch (IOException ioe) {
-        LOG.error(ioe);
-      } finally {
-        wlock.unlock();
-        LOG.info("Table " + tableDesc.getId() + " is added to the catalog ("
-            + serverName + ")");
-      }
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public BoolProto deleteTable(RpcController controller, StringProto name)
-        throws ServiceException {
-      wlock.lock();
-      try {
-        String tableId = name.getValue().toLowerCase();
-        if (!store.existTable(tableId)) {
-          throw new NoSuchTableException(tableId);
-        }
-        store.deleteTable(tableId);
-      } catch (IOException ioe) {
-        LOG.error(ioe);
-      } finally {
-        wlock.unlock();
-      }
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public BoolProto existsTable(RpcController controller, StringProto name)
-        throws ServiceException {
-      try {
-        String tableId = name.getValue().toLowerCase();
-        if (store.existTable(tableId)) {
-          return BOOL_TRUE;
-        } else {
-          return BOOL_FALSE;
-        }
-      } catch (IOException e) {
-        LOG.error(e);
-        throw new ServiceException(e);
-      }
-    }
-
-    @Override
-    public BoolProto addIndex(RpcController controller, IndexDescProto indexDesc)
-        throws ServiceException {
-      rlock.lock();
-      try {
-        if (store.existIndex(indexDesc.getName())) {
-          throw new AlreadyExistsIndexException(indexDesc.getName());
-        }
-        store.addIndex(indexDesc);
-      } catch (IOException ioe) {
-        LOG.error("ERROR : cannot add index " + indexDesc.getName(), ioe);
-        LOG.error(indexDesc);
-        throw new ServiceException(ioe);
-      } finally {
-        rlock.unlock();
-      }
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public BoolProto existIndexByName(RpcController controller,
-                                      StringProto indexName)
-        throws ServiceException {
-      rlock.lock();
-      try {
-        return BoolProto.newBuilder().setValue(
-            store.existIndex(indexName.getValue())).build();
-      } catch (IOException e) {
-        LOG.error(e);
-        return BoolProto.newBuilder().setValue(false).build();
-      } finally {
-        rlock.unlock();
-      }
-    }
-
-    @Override
-    public BoolProto existIndex(RpcController controller,
-                                GetIndexRequest request)
-        throws ServiceException {
-      rlock.lock();
-      try {
-        return BoolProto.newBuilder().setValue(
-            store.existIndex(request.getTableName(),
-                request.getColumnName())).build();
-      } catch (IOException e) {
-        LOG.error(e);
-        return BoolProto.newBuilder().setValue(false).build();
-      } finally {
-        rlock.unlock();
-      }
-    }
-
-    @Override
-    public IndexDescProto getIndexByName(RpcController controller,
-                                         StringProto indexName)
-        throws ServiceException {
-      rlock.lock();
-      try {
-        if (!store.existIndex(indexName.getValue())) {
-          throw new NoSuchIndexException(indexName.getValue());
-        }
-        return store.getIndex(indexName.getValue());
-      } catch (IOException ioe) {
-        LOG.error("ERROR : cannot get index " + indexName, ioe);
-        return null;
-      } finally {
-        rlock.unlock();
-      }
-    }
-
-    @Override
-    public IndexDescProto getIndex(RpcController controller,
-                                   GetIndexRequest request)
-        throws ServiceException {
-      rlock.lock();
-      try {
-        if (!store.existIndex(request.getTableName())) {
-          throw new NoSuchIndexException(request.getTableName() + "."
-              + request.getColumnName());
-        }
-        return store.getIndex(request.getTableName(), request.getColumnName());
-      } catch (IOException ioe) {
-        LOG.error("ERROR : cannot get index " + request.getTableName() + "."
-            + request.getColumnName(), ioe);
-        return null;
-      } finally {
-        rlock.unlock();
-      }
-    }
-
-    @Override
-    public BoolProto delIndex(RpcController controller, StringProto indexName)
-        throws ServiceException {
-      wlock.lock();
-      try {
-        if (!store.existIndex(indexName.getValue())) {
-          throw new NoSuchIndexException(indexName.getValue());
-        }
-        store.delIndex(indexName.getValue());
-      } catch (IOException e) {
-        LOG.error(e);
-      } finally {
-        wlock.unlock();
-      }
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public BoolProto registerFunction(RpcController controller,
-                                      FunctionDescProto funcDesc)
-        throws ServiceException {
-      String canonicalName =
-          CatalogUtil.getCanonicalName(funcDesc.getSignature(),
-              funcDesc.getParameterTypesList());
-      if (functions.containsKey(canonicalName)) {
-        throw new AlreadyExistsFunctionException(canonicalName);
-      }
-
-      functions.put(canonicalName, funcDesc);
-      if (LOG.isDebugEnabled()) {
-        LOG.info("Function " + canonicalName + " is registered.");
-      }
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public BoolProto unregisterFunction(RpcController controller,
-                                        UnregisterFunctionRequest request)
-        throws ServiceException {
-      String signature = request.getSignature();
-      List<DataType> paramTypes = new ArrayList<DataType>();
-      int size = request.getParameterTypesCount();
-      for (int i = 0; i < size; i++) {
-        paramTypes.add(request.getParameterTypes(i));
-      }
-      String canonicalName = CatalogUtil.getCanonicalName(signature, paramTypes);
-      if (!functions.containsKey(canonicalName)) {
-        throw new NoSuchFunctionException(canonicalName);
-      }
-
-      functions.remove(canonicalName);
-      LOG.info("GeneralFunction " + canonicalName + " is unregistered.");
-
-      return BOOL_TRUE;
-    }
-
-    @Override
-    public FunctionDescProto getFunctionMeta(RpcController controller,
-                                             GetFunctionMetaRequest request)
-        throws ServiceException {
-      List<DataType> paramTypes = new ArrayList<DataType>();
-      int size = request.getParameterTypesCount();
-      for (int i = 0; i < size; i++) {
-        paramTypes.add(request.getParameterTypes(i));
-      }
-      return functions.get(CatalogUtil.getCanonicalName(
-          request.getSignature().toLowerCase(), paramTypes));
-    }
-
-    @Override
-    public BoolProto containFunction(RpcController controller,
-                                     ContainFunctionRequest request)
-        throws ServiceException {
-      List<DataType> paramTypes = new ArrayList<DataType>();
-      int size = request.getParameterTypesCount();
-      for (int i = 0; i < size; i++) {
-        paramTypes.add(request.getParameterTypes(i));
-      }
-      boolean returnValue =
-          functions.containsKey(CatalogUtil.getCanonicalName(
-              request.getSignature().toLowerCase(), paramTypes));
-      return BoolProto.newBuilder().setValue(returnValue).build();
-    }
-  }
-
-  public static void main(String[] args) throws Exception {
-    TajoConf conf = new TajoConf();
-    CatalogServer catalog = new CatalogServer(new ArrayList<FunctionDesc>());
-    catalog.init(conf);
-    catalog.start();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java
deleted file mode 100644
index 6856736..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package tajo.catalog;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import tajo.conf.TajoConf;
-
-import java.io.IOException;
-
-/**
- * This class provides a catalog service interface in
- * local.
- */
-public class LocalCatalog extends AbstractCatalogClient {
-  private static final Log LOG = LogFactory.getLog(LocalCatalog.class);
-  private CatalogServer catalog;
-
-  public LocalCatalog(final TajoConf conf) throws IOException {
-    this.catalog = new CatalogServer();
-    this.catalog.init(conf);
-    this.catalog.start();
-    setStub(catalog.getHandler());
-  }
-
-  public LocalCatalog(final CatalogServer server) {
-    this.catalog = server;
-    setStub(server.getHandler());
-  }
-
-  public void shutdown() {
-    this.catalog.stop();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java
deleted file mode 100644
index 5117fe4..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog;
-
-import tajo.conf.TajoConf;
-
-import java.io.IOException;
-
-public class MiniCatalogServer {
-  private CatalogServer catalogServers;
-  
-  public MiniCatalogServer(TajoConf conf) throws IOException {
-    catalogServers = new CatalogServer();
-    catalogServers.init(conf);
-    catalogServers.start();
-  }
-  
-  public void shutdown() {
-    this.catalogServers.stop();
-  }
-  
-  public CatalogServer getCatalogServer() {
-    return this.catalogServers;
-  }
-  
-  public CatalogService getCatalog() {
-    return new LocalCatalog(this.catalogServers);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java
deleted file mode 100644
index c4c7eaa..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog.store;
-
-import tajo.catalog.FunctionDesc;
-import tajo.catalog.TableDesc;
-import tajo.catalog.proto.CatalogProtos.IndexDescProto;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-public interface CatalogStore extends Closeable {
-  void addTable(TableDesc desc) throws IOException;
-  
-  boolean existTable(String name) throws IOException;
-  
-  void deleteTable(String name) throws IOException;
-  
-  TableDesc getTable(String name) throws IOException;
-  
-  List<String> getAllTableNames() throws IOException;
-  
-  void addIndex(IndexDescProto proto) throws IOException;
-  
-  void delIndex(String indexName) throws IOException;
-  
-  IndexDescProto getIndex(String indexName) throws IOException;
-  
-  IndexDescProto getIndex(String tableName, String columnName) 
-      throws IOException;
-  
-  boolean existIndex(String indexName) throws IOException;
-  
-  boolean existIndex(String tableName, String columnName) throws IOException;
-  
-  IndexDescProto [] getIndexes(String tableName) throws IOException;
-  
-  void addFunction(FunctionDesc func) throws IOException;
-  
-  void deleteFunction(FunctionDesc func) throws IOException;
-  
-  void existFunction(FunctionDesc func) throws IOException;
-  
-  List<String> getAllFunctionNames() throws IOException;
-}