You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:38 UTC
[44/51] [partial] TAJO-22: The package prefix should be
org.apache.tajo. (DaeMyung Kang via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java b/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java
deleted file mode 100644
index d679c69..0000000
--- a/tajo-catalog/tajo-catalog-common/src/test/java/tajo/catalog/statistics/TestTableStat.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog.statistics;
-
-import org.junit.Test;
-import tajo.catalog.Column;
-import tajo.common.TajoDataTypes.Type;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestTableStat {
- @Test
- public final void testTableStat() throws CloneNotSupportedException {
- TableStat stat = new TableStat();
- stat.setNumRows(957685);
- stat.setNumBytes(1023234);
- stat.setNumBlocks(3123);
- stat.setNumPartitions(5);
- stat.setAvgRows(80000);
-
- int numCols = 3;
- ColumnStat[] cols = new ColumnStat[numCols];
- for (int i = 0; i < numCols; i++) {
- cols[i] = new ColumnStat(new Column("col_" + i, Type.INT8));
- cols[i].setNumDistVals(1024 * i);
- cols[i].setNumNulls(100 * i);
- stat.addColumnStat(cols[i]);
- }
-
- assertTrue(957685 == stat.getNumRows());
- assertTrue(1023234 == stat.getNumBytes());
- assertTrue(3123 == stat.getNumBlocks());
- assertTrue(5 == stat.getNumPartitions());
- assertTrue(80000 == stat.getAvgRows());
- assertEquals(3, stat.getColumnStats().size());
- for (int i = 0; i < numCols; i++) {
- assertEquals(cols[i], stat.getColumnStats().get(i));
- }
-
- TableStat stat2 = new TableStat(stat.getProto());
- tableStatEquals(stat, stat2);
-
- TableStat stat3 = (TableStat) stat.clone();
- tableStatEquals(stat, stat3);
- }
-
- public void tableStatEquals(TableStat s1, TableStat s2) {
- assertEquals(s1.getNumRows(), s2.getNumRows());
- assertEquals(s1.getNumBlocks(), s2.getNumBlocks());
- assertEquals(s1.getNumPartitions(), s2.getNumPartitions());
- assertEquals(s1.getAvgRows(), s2.getAvgRows());
- assertEquals(s1.getColumnStats().size(), s2.getColumnStats().size());
- for (int i = 0; i < s1.getColumnStats().size(); i++) {
- assertEquals(s1.getColumnStats().get(i), s2.getColumnStats().get(i));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
new file mode 100644
index 0000000..1cebd93
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
+import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
+import org.apache.tajo.catalog.store.CatalogStore;
+import org.apache.tajo.catalog.store.DBStore;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.rpc.ProtoBlockingRpcServer;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.InetSocketAddress;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This class provides the catalog service. The catalog service enables clients
+ * to register, unregister and access information about tables, functions, and
+ * cluster information.
+ */
+public class CatalogServer extends AbstractService {
+
+ private final static Log LOG = LogFactory.getLog(CatalogServer.class);
+ private TajoConf conf;
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Lock rlock = lock.readLock();
+ private final Lock wlock = lock.writeLock();
+
+ private CatalogStore store;
+
+ private Map<String, FunctionDescProto> functions = new HashMap<String, FunctionDescProto>();
+
+ // RPC variables
+ private ProtoBlockingRpcServer rpcServer;
+ private InetSocketAddress bindAddress;
+ private String serverName;
+ final CatalogProtocolHandler handler;
+
+ // Server status variables
+ private volatile boolean stopped = false;
+ @SuppressWarnings("unused")
+ private volatile boolean isOnline = false;
+
+ private static BoolProto BOOL_TRUE = BoolProto.newBuilder().
+ setValue(true).build();
+ private static BoolProto BOOL_FALSE = BoolProto.newBuilder().
+ setValue(false).build();
+
+ private List<FunctionDesc> builtingFuncs;
+
+ public CatalogServer() throws IOException {
+ super(CatalogServer.class.getName());
+ this.handler = new CatalogProtocolHandler();
+ this.builtingFuncs = new ArrayList<FunctionDesc>();
+ }
+
+ public CatalogServer(List<FunctionDesc> sqlFuncs) throws IOException {
+ this();
+ this.builtingFuncs = sqlFuncs;
+ }
+
+ @Override
+ public void init(Configuration _conf) {
+ this.conf = (TajoConf) _conf;
+
+ Constructor<?> cons;
+ try {
+ Class<?> storeClass =
+ this.conf.getClass(CatalogConstants.STORE_CLASS, DBStore.class);
+ LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
+
+ cons = storeClass.
+ getConstructor(new Class [] {Configuration.class});
+
+ this.store = (CatalogStore) cons.newInstance(this.conf);
+
+ initBuiltinFunctions(builtingFuncs);
+ } catch (Throwable t) {
+ LOG.error("cannot initialize CatalogServer", t);
+ }
+
+ super.init(conf);
+ }
+
+ private void initBuiltinFunctions(List<FunctionDesc> functions)
+ throws ServiceException {
+ for (FunctionDesc desc : functions) {
+ handler.registerFunction(null, desc.getProto());
+ }
+ }
+
+ public void start() {
+ // Server to handle client requests.
+ String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
+ // Creation of a HSA will force a resolve.
+ InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
+ try {
+ this.rpcServer = new ProtoBlockingRpcServer(
+ CatalogProtocol.class,
+ handler, initIsa);
+ this.rpcServer.start();
+
+ this.bindAddress = this.rpcServer.getBindAddress();
+ this.serverName = org.apache.tajo.util.NetUtils.getIpPortString(bindAddress);
+ conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
+ } catch (Exception e) {
+ LOG.error("Cannot start RPC Server of CatalogServer", e);
+ }
+
+ LOG.info("Catalog Server startup (" + serverName + ")");
+ super.start();
+ }
+
+ public void stop() {
+ this.rpcServer.shutdown();
+ LOG.info("Catalog Server (" + serverName + ") shutdown");
+ super.stop();
+ }
+
+ public CatalogProtocolHandler getHandler() {
+ return this.handler;
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return this.bindAddress;
+ }
+
+ public class CatalogProtocolHandler
+ implements CatalogProtocolService.BlockingInterface {
+
+ @Override
+ public TableDescProto getTableDesc(RpcController controller,
+ StringProto name)
+ throws ServiceException {
+ rlock.lock();
+ try {
+ String tableId = name.getValue().toLowerCase();
+ if (!store.existTable(tableId)) {
+ throw new NoSuchTableException(tableId);
+ }
+ return (TableDescProto) store.getTable(tableId).getProto();
+ } catch (IOException ioe) {
+ // TODO - handle exception
+ LOG.error(ioe);
+ return null;
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public GetAllTableNamesResponse getAllTableNames(RpcController controller,
+ NullProto request)
+ throws ServiceException {
+ try {
+ Iterator<String> iterator = store.getAllTableNames().iterator();
+ GetAllTableNamesResponse.Builder builder =
+ GetAllTableNamesResponse.newBuilder();
+ while (iterator.hasNext()) {
+ builder.addTableName(iterator.next());
+ }
+ return builder.build();
+ } catch (IOException ioe) {
+ // TODO - handle exception
+ return null;
+ }
+ }
+
+ @Override
+ public GetFunctionsResponse getFunctions(RpcController controller,
+ NullProto request)
+ throws ServiceException {
+ Iterator<FunctionDescProto> iterator = functions.values().iterator();
+ GetFunctionsResponse.Builder builder = GetFunctionsResponse.newBuilder();
+ while (iterator.hasNext()) {
+ builder.addFunctionDesc(iterator.next());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public BoolProto addTable(RpcController controller, TableDescProto tableDesc)
+ throws ServiceException {
+ Preconditions.checkArgument(tableDesc.hasId(),
+ "Must be set to the table name");
+ Preconditions.checkArgument(tableDesc.hasPath(),
+ "Must be set to the table URI");
+
+ wlock.lock();
+ try {
+ if (store.existTable(tableDesc.getId())) {
+ throw new AlreadyExistsTableException(tableDesc.getId());
+ }
+
+ // rewrite schema
+ SchemaProto revisedSchema =
+ CatalogUtil.getQualfiedSchema(tableDesc.getId(), tableDesc.getMeta()
+ .getSchema());
+
+ TableProto.Builder metaBuilder = TableProto.newBuilder(tableDesc.getMeta());
+ metaBuilder.setSchema(revisedSchema);
+ TableDescProto.Builder descBuilder = TableDescProto.newBuilder(tableDesc);
+ descBuilder.setMeta(metaBuilder.build());
+
+ store.addTable(new TableDescImpl(descBuilder.build()));
+
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ } finally {
+ wlock.unlock();
+ LOG.info("Table " + tableDesc.getId() + " is added to the catalog ("
+ + serverName + ")");
+ }
+
+ return BOOL_TRUE;
+ }
+
+ @Override
+ public BoolProto deleteTable(RpcController controller, StringProto name)
+ throws ServiceException {
+ wlock.lock();
+ try {
+ String tableId = name.getValue().toLowerCase();
+ if (!store.existTable(tableId)) {
+ throw new NoSuchTableException(tableId);
+ }
+ store.deleteTable(tableId);
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ } finally {
+ wlock.unlock();
+ }
+
+ return BOOL_TRUE;
+ }
+
+ @Override
+ public BoolProto existsTable(RpcController controller, StringProto name)
+ throws ServiceException {
+ try {
+ String tableId = name.getValue().toLowerCase();
+ if (store.existTable(tableId)) {
+ return BOOL_TRUE;
+ } else {
+ return BOOL_FALSE;
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public BoolProto addIndex(RpcController controller, IndexDescProto indexDesc)
+ throws ServiceException {
+ rlock.lock();
+ try {
+ if (store.existIndex(indexDesc.getName())) {
+ throw new AlreadyExistsIndexException(indexDesc.getName());
+ }
+ store.addIndex(indexDesc);
+ } catch (IOException ioe) {
+ LOG.error("ERROR : cannot add index " + indexDesc.getName(), ioe);
+ LOG.error(indexDesc);
+ throw new ServiceException(ioe);
+ } finally {
+ rlock.unlock();
+ }
+
+ return BOOL_TRUE;
+ }
+
+ @Override
+ public BoolProto existIndexByName(RpcController controller,
+ StringProto indexName)
+ throws ServiceException {
+ rlock.lock();
+ try {
+ return BoolProto.newBuilder().setValue(
+ store.existIndex(indexName.getValue())).build();
+ } catch (IOException e) {
+ LOG.error(e);
+ return BoolProto.newBuilder().setValue(false).build();
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public BoolProto existIndex(RpcController controller,
+ GetIndexRequest request)
+ throws ServiceException {
+ rlock.lock();
+ try {
+ return BoolProto.newBuilder().setValue(
+ store.existIndex(request.getTableName(),
+ request.getColumnName())).build();
+ } catch (IOException e) {
+ LOG.error(e);
+ return BoolProto.newBuilder().setValue(false).build();
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public IndexDescProto getIndexByName(RpcController controller,
+ StringProto indexName)
+ throws ServiceException {
+ rlock.lock();
+ try {
+ if (!store.existIndex(indexName.getValue())) {
+ throw new NoSuchIndexException(indexName.getValue());
+ }
+ return store.getIndex(indexName.getValue());
+ } catch (IOException ioe) {
+ LOG.error("ERROR : cannot get index " + indexName, ioe);
+ return null;
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public IndexDescProto getIndex(RpcController controller,
+ GetIndexRequest request)
+ throws ServiceException {
+ rlock.lock();
+ try {
+ if (!store.existIndex(request.getTableName())) {
+ throw new NoSuchIndexException(request.getTableName() + "."
+ + request.getColumnName());
+ }
+ return store.getIndex(request.getTableName(), request.getColumnName());
+ } catch (IOException ioe) {
+ LOG.error("ERROR : cannot get index " + request.getTableName() + "."
+ + request.getColumnName(), ioe);
+ return null;
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public BoolProto delIndex(RpcController controller, StringProto indexName)
+ throws ServiceException {
+ wlock.lock();
+ try {
+ if (!store.existIndex(indexName.getValue())) {
+ throw new NoSuchIndexException(indexName.getValue());
+ }
+ store.delIndex(indexName.getValue());
+ } catch (IOException e) {
+ LOG.error(e);
+ } finally {
+ wlock.unlock();
+ }
+
+ return BOOL_TRUE;
+ }
+
+ @Override
+ public BoolProto registerFunction(RpcController controller,
+ FunctionDescProto funcDesc)
+ throws ServiceException {
+ String canonicalName =
+ CatalogUtil.getCanonicalName(funcDesc.getSignature(),
+ funcDesc.getParameterTypesList());
+ if (functions.containsKey(canonicalName)) {
+ throw new AlreadyExistsFunctionException(canonicalName);
+ }
+
+ functions.put(canonicalName, funcDesc);
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Function " + canonicalName + " is registered.");
+ }
+
+ return BOOL_TRUE;
+ }
+
+ @Override
+ public BoolProto unregisterFunction(RpcController controller,
+ UnregisterFunctionRequest request)
+ throws ServiceException {
+ String signature = request.getSignature();
+ List<DataType> paramTypes = new ArrayList<DataType>();
+ int size = request.getParameterTypesCount();
+ for (int i = 0; i < size; i++) {
+ paramTypes.add(request.getParameterTypes(i));
+ }
+ String canonicalName = CatalogUtil.getCanonicalName(signature, paramTypes);
+ if (!functions.containsKey(canonicalName)) {
+ throw new NoSuchFunctionException(canonicalName);
+ }
+
+ functions.remove(canonicalName);
+ LOG.info("GeneralFunction " + canonicalName + " is unregistered.");
+
+ return BOOL_TRUE;
+ }
+
+ @Override
+ public FunctionDescProto getFunctionMeta(RpcController controller,
+ GetFunctionMetaRequest request)
+ throws ServiceException {
+ List<DataType> paramTypes = new ArrayList<DataType>();
+ int size = request.getParameterTypesCount();
+ for (int i = 0; i < size; i++) {
+ paramTypes.add(request.getParameterTypes(i));
+ }
+ return functions.get(CatalogUtil.getCanonicalName(
+ request.getSignature().toLowerCase(), paramTypes));
+ }
+
+ @Override
+ public BoolProto containFunction(RpcController controller,
+ ContainFunctionRequest request)
+ throws ServiceException {
+ List<DataType> paramTypes = new ArrayList<DataType>();
+ int size = request.getParameterTypesCount();
+ for (int i = 0; i < size; i++) {
+ paramTypes.add(request.getParameterTypes(i));
+ }
+ boolean returnValue =
+ functions.containsKey(CatalogUtil.getCanonicalName(
+ request.getSignature().toLowerCase(), paramTypes));
+ return BoolProto.newBuilder().setValue(returnValue).build();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TajoConf conf = new TajoConf();
+ CatalogServer catalog = new CatalogServer(new ArrayList<FunctionDesc>());
+ catalog.init(conf);
+ catalog.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java
new file mode 100644
index 0000000..3a98ca0
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalog.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+
+/**
+ * This class provides a catalog service interface in
+ * local.
+ */
+public class LocalCatalog extends AbstractCatalogClient {
+ private static final Log LOG = LogFactory.getLog(LocalCatalog.class);
+ private CatalogServer catalog;
+
+ public LocalCatalog(final TajoConf conf) throws IOException {
+ this.catalog = new CatalogServer();
+ this.catalog.init(conf);
+ this.catalog.start();
+ setStub(catalog.getHandler());
+ }
+
+ public LocalCatalog(final CatalogServer server) {
+ this.catalog = server;
+ setStub(server.getHandler());
+ }
+
+ public void shutdown() {
+ this.catalog.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java
new file mode 100644
index 0000000..a31f3c0
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/MiniCatalogServer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog;
+
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+
+public class MiniCatalogServer {
+ private CatalogServer catalogServers;
+
+ public MiniCatalogServer(TajoConf conf) throws IOException {
+ catalogServers = new CatalogServer();
+ catalogServers.init(conf);
+ catalogServers.start();
+ }
+
+ public void shutdown() {
+ this.catalogServers.stop();
+ }
+
+ public CatalogServer getCatalogServer() {
+ return this.catalogServers;
+ }
+
+ public CatalogService getCatalog() {
+ return new LocalCatalog(this.catalogServers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
new file mode 100644
index 0000000..44f4e7a
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.store;
+
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+public interface CatalogStore extends Closeable {
+ void addTable(TableDesc desc) throws IOException;
+
+ boolean existTable(String name) throws IOException;
+
+ void deleteTable(String name) throws IOException;
+
+ TableDesc getTable(String name) throws IOException;
+
+ List<String> getAllTableNames() throws IOException;
+
+ void addIndex(IndexDescProto proto) throws IOException;
+
+ void delIndex(String indexName) throws IOException;
+
+ IndexDescProto getIndex(String indexName) throws IOException;
+
+ IndexDescProto getIndex(String tableName, String columnName)
+ throws IOException;
+
+ boolean existIndex(String indexName) throws IOException;
+
+ boolean existIndex(String tableName, String columnName) throws IOException;
+
+ IndexDescProto [] getIndexes(String tableName) throws IOException;
+
+ void addFunction(FunctionDesc func) throws IOException;
+
+ void deleteFunction(FunctionDesc func) throws IOException;
+
+ void existFunction(FunctionDesc func) throws IOException;
+
+ List<String> getAllFunctionNames() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
new file mode 100644
index 0000000..41b2ca1
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/DBStore.java
@@ -0,0 +1,1033 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog.store;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.*;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.InternalException;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class DBStore implements CatalogStore {
+ private final Log LOG = LogFactory.getLog(DBStore.class);
+ private final Configuration conf;
+ private final String driver;
+ private final String jdbcUri;
+ private Connection conn;
+
+ private static final int VERSION = 1;
+
+ private static final String TB_META = "META";
+ private static final String TB_TABLES = "TABLES";
+ private static final String TB_COLUMNS = "COLUMNS";
+ private static final String TB_OPTIONS = "OPTIONS";
+ private static final String TB_INDEXES = "INDEXES";
+ private static final String TB_STATISTICS = "STATS";
+
+ private static final String C_TABLE_ID = "TABLE_ID";
+
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private Lock rlock = lock.readLock();
+ private Lock wlock = lock.writeLock();
+
+ public DBStore(final Configuration conf)
+ throws InternalException {
+ this.conf = conf;
+
+ this.driver =
+ this.conf.get(CatalogConstants.JDBC_DRIVER, CatalogConstants.DEFAULT_JDBC_DRIVER);
+ this.jdbcUri =
+ this.conf.get(CatalogConstants.JDBC_URI);
+
+ try {
+ Class.forName(driver).newInstance();
+ LOG.info("Loaded the JDBC driver (" + driver +")");
+ } catch (Exception e) {
+ throw new InternalException("Cannot load JDBC driver " + driver, e);
+ }
+
+ try {
+ LOG.info("Trying to connect database (" + jdbcUri + ")");
+ conn = DriverManager.getConnection(jdbcUri + ";create=true");
+ LOG.info("Connected to database (" + jdbcUri +")");
+ } catch (SQLException e) {
+ throw new InternalException("Cannot connect to database (" + jdbcUri
+ +")", e);
+ }
+
+ try {
+ if (!isInitialized()) {
+ LOG.info("The base tables of CatalogServer are created.");
+ createBaseTable();
+ } else {
+ LOG.info("The base tables of CatalogServer already is initialized.");
+ }
+ } catch (SQLException se) {
+ throw new InternalException(
+ "Cannot initialize the persistent storage of Catalog", se);
+ }
+
+ int dbVersion = 0;
+ try {
+ dbVersion = needUpgrade();
+ } catch (SQLException e) {
+ throw new InternalException(
+ "Cannot check if the DB need to be upgraded", e);
+ }
+
+// if (dbVersion < VERSION) {
+// LOG.info("DB Upgrade is needed");
+// try {
+// upgrade(dbVersion, VERSION);
+// } catch (SQLException e) {
+// LOG.error(e.getMessage());
+// throw new InternalException("DB upgrade is failed.", e);
+// }
+// }
+ }
+
+ private int needUpgrade() throws SQLException {
+ String sql = "SELECT VERSION FROM " + TB_META;
+ Statement stmt = conn.createStatement();
+ ResultSet res = stmt.executeQuery(sql);
+
+ if (res.next() == false) { // if this db version is 0
+ insertVersion();
+ return 0;
+ } else {
+ return res.getInt(1);
+ }
+ }
+
+ private void insertVersion() throws SQLException {
+ String sql = "INSERT INTO " + TB_META + " values (0)";
+ Statement stmt = conn.createStatement();
+ stmt.executeUpdate(sql);
+ stmt.close();
+ }
+
+ private void upgrade(int from, int to) throws SQLException {
+ String sql;
+ Statement stmt;
+ if (from == 0 ) {
+ if (to == 1) {
+ sql = "DROP INDEX idx_options_key";
+ LOG.info(sql);
+
+ stmt = conn.createStatement();
+ stmt.addBatch(sql);
+
+ sql =
+ "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+ stmt.addBatch(sql);
+ LOG.info(sql);
+ stmt.executeBatch();
+ stmt.close();
+
+ LOG.info("DB Upgraded from " + from + " to " + to);
+ } else {
+ LOG.info("DB Upgraded from " + from + " to " + to);
+ }
+ }
+ }
+
+ // TODO - DDL and index statements should be renamed
+ private void createBaseTable() throws SQLException {
+ wlock.lock();
+ try {
+ // META
+ Statement stmt = conn.createStatement();
+ String meta_ddl = "CREATE TABLE " + TB_META + " (version int NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(meta_ddl);
+ }
+ stmt.executeUpdate(meta_ddl);
+ LOG.info("Table '" + TB_META + " is created.");
+
+ // TABLES
+ stmt = conn.createStatement();
+ String tables_ddl = "CREATE TABLE "
+ + TB_TABLES + " ("
+ + "TID int NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), "
+ + C_TABLE_ID + " VARCHAR(256) NOT NULL CONSTRAINT TABLE_ID_UNIQ UNIQUE, "
+ + "path VARCHAR(1024), "
+ + "store_type CHAR(16), "
+ + "options VARCHAR(32672), "
+ + "CONSTRAINT TABLES_PK PRIMARY KEY (TID)" +
+ ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(tables_ddl);
+ }
+ stmt.addBatch(tables_ddl);
+ String idx_tables_tid =
+ "CREATE UNIQUE INDEX idx_tables_tid on " + TB_TABLES + " (TID)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_tables_tid);
+ }
+ stmt.addBatch(idx_tables_tid);
+
+ String idx_tables_name = "CREATE UNIQUE INDEX idx_tables_name on "
+ + TB_TABLES + "(" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_tables_name);
+ }
+ stmt.addBatch(idx_tables_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_TABLES + "' is created.");
+
+ // COLUMNS
+ stmt = conn.createStatement();
+ String columns_ddl =
+ "CREATE TABLE " + TB_COLUMNS + " ("
+ + "TID INT NOT NULL REFERENCES " + TB_TABLES + " (TID) ON DELETE CASCADE, "
+ + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES " + TB_TABLES + "("
+ + C_TABLE_ID + ") ON DELETE CASCADE, "
+ + "column_id INT NOT NULL,"
+ + "column_name VARCHAR(256) NOT NULL, " + "data_type CHAR(16), "
+ + "CONSTRAINT C_COLUMN_ID UNIQUE (" + C_TABLE_ID + ", column_name))";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(columns_ddl);
+ }
+ stmt.addBatch(columns_ddl);
+
+ String idx_fk_columns_table_name =
+ "CREATE UNIQUE INDEX idx_fk_columns_table_name on "
+ + TB_COLUMNS + "(" + C_TABLE_ID + ", column_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_fk_columns_table_name);
+ }
+ stmt.addBatch(idx_fk_columns_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_COLUMNS + " is created.");
+
+ // OPTIONS
+ stmt = conn.createStatement();
+ String options_ddl =
+ "CREATE TABLE " + TB_OPTIONS +" ("
+ + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES TABLES (" + C_TABLE_ID +") "
+ + "ON DELETE CASCADE, "
+ + "key_ VARCHAR(256) NOT NULL, value_ VARCHAR(256) NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(options_ddl);
+ }
+ stmt.addBatch(options_ddl);
+
+ String idx_options_key =
+ "CREATE INDEX idx_options_key on " + TB_OPTIONS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_options_key);
+ }
+ stmt.addBatch(idx_options_key);
+ String idx_options_table_name =
+ "CREATE INDEX idx_options_table_name on " + TB_OPTIONS
+ + "(" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_options_table_name);
+ }
+ stmt.addBatch(idx_options_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_OPTIONS + " is created.");
+
+ // INDEXES
+ stmt = conn.createStatement();
+ String indexes_ddl = "CREATE TABLE " + TB_INDEXES +"("
+ + "index_name VARCHAR(256) NOT NULL PRIMARY KEY, "
+ + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "column_name VARCHAR(256) NOT NULL, "
+ + "data_type VARCHAR(256) NOT NULL, "
+ + "index_type CHAR(32) NOT NULL, "
+ + "is_unique BOOLEAN NOT NULL, "
+ + "is_clustered BOOLEAN NOT NULL, "
+ + "is_ascending BOOLEAN NOT NULL)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(indexes_ddl);
+ }
+ stmt.addBatch(indexes_ddl);
+
+ String idx_indexes_key = "CREATE UNIQUE INDEX idx_indexes_key ON "
+ + TB_INDEXES + " (index_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_indexes_key);
+ }
+ stmt.addBatch(idx_indexes_key);
+
+ String idx_indexes_columns = "CREATE INDEX idx_indexes_columns ON "
+ + TB_INDEXES + " (" + C_TABLE_ID + ", column_name)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_indexes_columns);
+ }
+ stmt.addBatch(idx_indexes_columns);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_INDEXES + "' is created.");
+
+ String stats_ddl = "CREATE TABLE " + TB_STATISTICS + "("
+ + C_TABLE_ID + " VARCHAR(256) NOT NULL REFERENCES TABLES (" + C_TABLE_ID + ") "
+ + "ON DELETE CASCADE, "
+ + "num_rows BIGINT, "
+ + "num_bytes BIGINT)";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stats_ddl);
+ }
+ stmt.addBatch(stats_ddl);
+
+ String idx_stats_fk_table_name = "CREATE INDEX idx_stats_table_name ON "
+ + TB_STATISTICS + " (" + C_TABLE_ID + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(idx_stats_fk_table_name);
+ }
+ stmt.addBatch(idx_stats_fk_table_name);
+ stmt.executeBatch();
+ LOG.info("Table '" + TB_STATISTICS + "' is created.");
+
+ } finally {
+ wlock.unlock();
+ }
+ }
+
+ private boolean isInitialized() throws SQLException {
+ wlock.lock();
+ try {
+ boolean found = false;
+ ResultSet res = conn.getMetaData().getTables(null, null, null,
+ new String [] {"TABLE"});
+
+ String resName;
+ while (res.next() && !found) {
+ resName = res.getString("TABLE_NAME");
+ if (TB_META.equals(resName)
+ || TB_TABLES.equals(resName)
+ || TB_COLUMNS.equals(resName)
+ || TB_OPTIONS.equals(resName)) {
+ return true;
+ }
+ }
+ } finally {
+ wlock.unlock();
+ }
+ return false;
+ }
+
+ final boolean checkInternalTable(final String tableName) throws SQLException {
+ rlock.lock();
+ try {
+ boolean found = false;
+ ResultSet res = conn.getMetaData().getTables(null, null, null,
+ new String [] {"TABLE"});
+ while(res.next() && !found) {
+ if (tableName.equals(res.getString("TABLE_NAME")))
+ found = true;
+ }
+
+ return found;
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ @Override
+ public final void addTable(final TableDesc table) throws IOException {
+ Statement stmt = null;
+ ResultSet res;
+
+ String sql =
+ "INSERT INTO " + TB_TABLES + " (" + C_TABLE_ID + ", path, store_type) "
+ + "VALUES('" + table.getId() + "', "
+ + "'" + table.getPath() + "', "
+ + "'" + table.getMeta().getStoreType() + "'"
+ + ")";
+
+ wlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.addBatch(sql);
+ stmt.executeBatch();
+
+ stmt = conn.createStatement();
+ sql = "SELECT TID from " + TB_TABLES + " WHERE " + C_TABLE_ID
+ + " = '" + table.getId() + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no tid matched to "
+ + table.getId());
+ }
+ int tid = res.getInt("TID");
+
+ String colSql;
+ int columnId = 0;
+ for (Column col : table.getMeta().getSchema().getColumns()) {
+ colSql = columnToSQL(tid, table, columnId, col);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(colSql);
+ }
+ stmt.addBatch(colSql);
+ columnId++;
+ }
+
+ Iterator<Entry<String,String>> it = table.getMeta().getOptions();
+ String optSql;
+ while (it.hasNext()) {
+ optSql = keyvalToSQL(table, it.next());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(optSql);
+ }
+ stmt.addBatch(optSql);
+ }
+ stmt.executeBatch();
+ if (table.getMeta().getStat() != null) {
+ sql = "INSERT INTO " + TB_STATISTICS + " (" + C_TABLE_ID + ", num_rows, num_bytes) "
+ + "VALUES ('" + table.getId() + "', "
+ + table.getMeta().getStat().getNumRows() + ","
+ + table.getMeta().getStat().getNumBytes() + ")";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.addBatch(sql);
+ stmt.executeBatch();
+ }
+ } catch (SQLException se) {
+ throw new IOException(se.getMessage(), se);
+ } finally {
+ wlock.unlock();
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ }
+ }
+ }
+
+ private String columnToSQL(final int tid, final TableDesc desc,
+ final int columnId, final Column col) {
+ String sql =
+ "INSERT INTO " + TB_COLUMNS
+ + " (tid, " + C_TABLE_ID + ", column_id, column_name, data_type) "
+ + "VALUES("
+ + tid + ","
+ + "'" + desc.getId() + "',"
+ + columnId + ", "
+ + "'" + col.getColumnName() + "',"
+ + "'" + col.getDataType().getType().name() + "'"
+ + ")";
+
+ return sql;
+ }
+
+ private String keyvalToSQL(final TableDesc desc,
+ final Entry<String,String> keyVal) {
+ String sql =
+ "INSERT INTO " + TB_OPTIONS
+ + " (" + C_TABLE_ID + ", key_, value_) "
+ + "VALUES("
+ + "'" + desc.getId() + "',"
+ + "'" + keyVal.getKey() + "',"
+ + "'" + keyVal.getValue() + "'"
+ + ")";
+
+ return sql;
+ }
+
+ @Override
+ public final boolean existTable(final String name) throws IOException {
+ StringBuilder sql = new StringBuilder();
+ sql.append("SELECT " + C_TABLE_ID + " from ")
+ .append(TB_TABLES)
+ .append(" WHERE " + C_TABLE_ID + " = '")
+ .append(name)
+ .append("'");
+
+ Statement stmt = null;
+ boolean exist = false;
+ rlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql.toString());
+ }
+ ResultSet res = stmt.executeQuery(sql.toString());
+ exist = res.next();
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ rlock.unlock();
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+
+ return exist;
+ }
+
+ @Override
+ public final void deleteTable(final String name) throws IOException {
+ Statement stmt = null;
+ String sql = null;
+ try {
+ wlock.lock();
+ try {
+ stmt = conn.createStatement();
+ sql = "DELETE FROM " + TB_COLUMNS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_OPTIONS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt = conn.createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_STATISTICS +
+ " WHERE " + C_TABLE_ID + " = '" + name + "'";
+ LOG.info(sql);
+ stmt = conn.createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+
+ try {
+ sql = "DELETE FROM " + TB_TABLES +
+ " WHERE " + C_TABLE_ID +" = '" + name + "'";
+ LOG.info(sql);
+ stmt = conn.createStatement();
+ stmt.execute(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+
+ } finally {
+ wlock.unlock();
+ }
+ }
+
+ @Override
+ public final TableDesc getTable(final String name) throws IOException {
+ ResultSet res = null;
+ Statement stmt = null;
+
+ String tableName = null;
+ Path path = null;
+ StoreType storeType = null;
+ Options options;
+ TableStat stat = null;
+
+ rlock.lock();
+ try {
+ try {
+ String sql =
+ "SELECT " + C_TABLE_ID + ", path, store_type from " + TB_TABLES
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt = conn.createStatement();
+ res = stmt.executeQuery(sql);
+ if (!res.next()) { // there is no table of the given name.
+ return null;
+ }
+ tableName = res.getString(C_TABLE_ID).trim();
+ path = new Path(res.getString("path").trim());
+ storeType = CatalogUtil.getStoreType(res.getString("store_type").trim());
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ stmt.close();
+ res.close();
+ }
+
+ Schema schema = null;
+ try {
+ String sql = "SELECT column_name, data_type from " + TB_COLUMNS
+ + " WHERE " + C_TABLE_ID + "='" + name + "' ORDER by column_id asc";
+
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ schema = new Schema();
+ while (res.next()) {
+ String columnName = tableName + "."
+ + res.getString("column_name").trim();
+ Type dataType = getDataType(res.getString("data_type")
+ .trim());
+ schema.addColumn(columnName, dataType);
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ stmt.close();
+ res.close();
+ }
+
+ options = Options.create();
+ try {
+ String sql = "SELECT key_, value_ from " + TB_OPTIONS
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+
+ while (res.next()) {
+ options.put(
+ res.getString("key_"),
+ res.getString("value_"));
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ stmt.close();
+ res.close();
+ }
+
+ try {
+ String sql = "SELECT num_rows, num_bytes from " + TB_STATISTICS
+ + " WHERE " + C_TABLE_ID + "='" + name + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt = conn.createStatement();
+ res = stmt.executeQuery(sql);
+
+ if (res.next()) {
+ stat = new TableStat();
+ stat.setNumRows(res.getLong("num_rows"));
+ stat.setNumBytes(res.getLong("num_bytes"));
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ stmt.close();
+ res.close();
+ }
+
+ TableMeta meta = new TableMetaImpl(schema, storeType, options);
+ if (stat != null) {
+ meta.setStat(stat);
+ }
+ TableDesc table = new TableDescImpl(tableName, meta, path);
+
+ return table;
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ rlock.unlock();
+ }
+ }
+
+ private Type getDataType(final String typeStr) {
+ try {
+ return Enum.valueOf(Type.class, typeStr);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Cannot find a matched type aginst from '" + typeStr + "'");
+ return null;
+ }
+ }
+
+ @Override
+ public final List<String> getAllTableNames() throws IOException {
+ String sql = "SELECT " + C_TABLE_ID + " from " + TB_TABLES;
+
+ Statement stmt = null;
+ ResultSet res;
+
+ List<String> tables = new ArrayList<String>();
+ rlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery(sql);
+ while (res.next()) {
+ tables.add(res.getString(C_TABLE_ID).trim());
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ rlock.unlock();
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+ return tables;
+ }
+
+ public final void addIndex(final IndexDescProto proto) throws IOException {
+ String sql =
+ "INSERT INTO indexes (index_name, " + C_TABLE_ID + ", column_name, "
+ +"data_type, index_type, is_unique, is_clustered, is_ascending) VALUES "
+ +"(?,?,?,?,?,?,?,?)";
+
+ PreparedStatement stmt = null;
+
+ wlock.lock();
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, proto.getName());
+ stmt.setString(2, proto.getTableId());
+ stmt.setString(3, proto.getColumn().getColumnName());
+ stmt.setString(4, proto.getColumn().getDataType().getType().name());
+ stmt.setString(5, proto.getIndexMethod().toString());
+ stmt.setBoolean(6, proto.hasIsUnique() && proto.getIsUnique());
+ stmt.setBoolean(7, proto.hasIsClustered() && proto.getIsClustered());
+ stmt.setBoolean(8, proto.hasIsAscending() && proto.getIsAscending());
+ stmt.executeUpdate();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ wlock.unlock();
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+ }
+
+ public final void delIndex(final String indexName) throws IOException {
+ String sql =
+ "DELETE FROM " + TB_INDEXES
+ + " WHERE index_name='" + indexName + "'";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ Statement stmt = null;
+ wlock.lock();
+ try {
+ stmt = conn.createStatement();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ stmt.executeUpdate(sql);
+ } catch (SQLException se) {
+ throw new IOException(se);
+ } finally {
+ wlock.unlock();
+ try {
+ if (stmt != null) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ }
+ }
+ }
+
+ public final IndexDescProto getIndex(final String indexName)
+ throws IOException {
+ ResultSet res;
+ PreparedStatement stmt;
+
+ IndexDescProto proto = null;
+
+ rlock.lock();
+ try {
+ String sql =
+ "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where index_name = ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, indexName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(stmt.toString());
+ }
+ res = stmt.executeQuery();
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no index matched to " + indexName);
+ }
+ proto = resultToProto(res);
+ } catch (SQLException se) {
+ } finally {
+ rlock.unlock();
+ }
+
+ return proto;
+ }
+
+ public final IndexDescProto getIndex(final String tableName,
+ final String columnName) throws IOException {
+ ResultSet res;
+ PreparedStatement stmt;
+
+ IndexDescProto proto = null;
+
+ rlock.lock();
+ try {
+ String sql =
+ "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where " + C_TABLE_ID + " = ? AND column_name = ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ stmt.setString(2, columnName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ if (!res.next()) {
+ throw new IOException("ERROR: there is no index matched to "
+ + tableName + "." + columnName);
+ }
+ proto = resultToProto(res);
+ } catch (SQLException se) {
+ new IOException(se);
+ } finally {
+ rlock.unlock();
+ }
+
+ return proto;
+ }
+
+ public final boolean existIndex(final String indexName) throws IOException {
+ String sql = "SELECT index_name from " + TB_INDEXES
+ + " WHERE index_name = ?";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ PreparedStatement stmt = null;
+ boolean exist = false;
+ rlock.lock();
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, indexName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ ResultSet res = stmt.executeQuery();
+ exist = res.next();
+ } catch (SQLException se) {
+
+ } finally {
+ rlock.unlock();
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ }
+ }
+
+ return exist;
+ }
+
+ @Override
+ public boolean existIndex(String tableName, String columnName)
+ throws IOException {
+ String sql = "SELECT index_name from " + TB_INDEXES
+ + " WHERE " + C_TABLE_ID + " = ? AND COLUMN_NAME = ?";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ PreparedStatement stmt = null;
+ boolean exist = false;
+ rlock.lock();
+ try {
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ stmt.setString(2, columnName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ ResultSet res = stmt.executeQuery();
+ exist = res.next();
+ } catch (SQLException se) {
+
+ } finally {
+ rlock.unlock();
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ }
+ }
+
+ return exist;
+ }
+
+ public final IndexDescProto [] getIndexes(final String tableName)
+ throws IOException {
+ ResultSet res;
+ PreparedStatement stmt;
+
+ List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+
+ rlock.lock();
+ try {
+ String sql = "SELECT index_name, " + C_TABLE_ID + ", column_name, data_type, "
+ + "index_type, is_unique, is_clustered, is_ascending FROM indexes "
+ + "where " + C_TABLE_ID + "= ?";
+ stmt = conn.prepareStatement(sql);
+ stmt.setString(1, tableName);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+ res = stmt.executeQuery();
+ while (res.next()) {
+ protos.add(resultToProto(res));
+ }
+ } catch (SQLException se) {
+ } finally {
+ rlock.unlock();
+ }
+
+ return protos.toArray(new IndexDescProto [protos.size()]);
+ }
+
+ private IndexDescProto resultToProto(final ResultSet res) throws SQLException {
+ IndexDescProto.Builder builder = IndexDescProto.newBuilder();
+ builder.setName(res.getString("index_name"));
+ builder.setTableId(res.getString(C_TABLE_ID));
+ builder.setColumn(resultToColumnProto(res));
+ builder.setIndexMethod(getIndexMethod(res.getString("index_type").trim()));
+ builder.setIsUnique(res.getBoolean("is_unique"));
+ builder.setIsClustered(res.getBoolean("is_clustered"));
+ builder.setIsAscending(res.getBoolean("is_ascending"));
+ return builder.build();
+ }
+
+ private ColumnProto resultToColumnProto(final ResultSet res) throws SQLException {
+ ColumnProto.Builder builder = ColumnProto.newBuilder();
+ builder.setColumnName(res.getString("column_name"));
+ builder.setDataType(CatalogUtil.newDataTypeWithoutLen(
+ getDataType(res.getString("data_type").trim())));
+ return builder.build();
+ }
+
+ private IndexMethod getIndexMethod(final String typeStr) {
+ if (typeStr.equals(IndexMethod.TWO_LEVEL_BIN_TREE.toString())) {
+ return IndexMethod.TWO_LEVEL_BIN_TREE;
+ } else {
+ LOG.error("Cannot find a matched type aginst from '"
+ + typeStr + "'");
+ // TODO - needs exception handling
+ return null;
+ }
+ }
+
+ @Override
+ public final void addFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void deleteFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final void existFunction(final FunctionDesc func) throws IOException {
+ // TODO - not implemented yet
+ }
+
+ @Override
+ public final List<String> getAllFunctionNames() throws IOException {
+ // TODO - not implemented yet
+ return null;
+ }
+
+ @Override
+ public final void close() {
+ try {
+ DriverManager.getConnection("jdbc:derby:;shutdown=true");
+ } catch (SQLException e) {
+ // TODO - to be fixed
+ //LOG.error(e.getMessage(), e);
+ }
+
+ LOG.info("Shutdown database (" + jdbcUri + ")");
+ }
+
+
+ public static void main(final String[] args) throws IOException {
+ @SuppressWarnings("unused")
+ DBStore store = new DBStore(new TajoConf());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
new file mode 100644
index 0000000..4aacefd
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo.catalog.store;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.FunctionDesc;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class MemStore implements CatalogStore {
+ private final Map<String,TableDesc> tables
+ = Maps.newHashMap();
+ private final Map<String, FunctionDesc> functions
+ = Maps.newHashMap();
+ private final Map<String, IndexDescProto> indexes
+ = Maps.newHashMap();
+ private final Map<String, IndexDescProto> indexesByColumn
+ = Maps.newHashMap();
+
+ public MemStore(Configuration conf) {
+ }
+
+ /* (non-Javadoc)
+ * @see java.io.Closeable#close()
+ */
+ @Override
+ public void close() throws IOException {
+ tables.clear();
+ functions.clear();
+ indexes.clear();
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#addTable(TableDesc)
+ */
+ @Override
+ public void addTable(TableDesc desc) throws IOException {
+ synchronized(tables) {
+ tables.put(desc.getId(), desc);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#existTable(java.lang.String)
+ */
+ @Override
+ public boolean existTable(String name) throws IOException {
+ synchronized(tables) {
+ return tables.containsKey(name);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#deleteTable(java.lang.String)
+ */
+ @Override
+ public void deleteTable(String name) throws IOException {
+ synchronized(tables) {
+ tables.remove(name);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#getTable(java.lang.String)
+ */
+ @Override
+ public TableDesc getTable(String name) throws IOException {
+ return tables.get(name);
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#getAllTableNames()
+ */
+ @Override
+ public List<String> getAllTableNames() throws IOException {
+ return new ArrayList<String>(tables.keySet());
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#addIndex(nta.catalog.proto.CatalogProtos.IndexDescProto)
+ */
+ @Override
+ public void addIndex(IndexDescProto proto) throws IOException {
+ synchronized(indexes) {
+ indexes.put(proto.getName(), proto);
+ indexesByColumn.put(proto.getTableId() + "."
+ + proto.getColumn().getColumnName(), proto);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#delIndex(java.lang.String)
+ */
+ @Override
+ public void delIndex(String indexName) throws IOException {
+ synchronized(indexes) {
+ indexes.remove(indexName);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#getIndex(java.lang.String)
+ */
+ @Override
+ public IndexDescProto getIndex(String indexName) throws IOException {
+ return indexes.get(indexName);
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#getIndex(java.lang.String, java.lang.String)
+ */
+ @Override
+ public IndexDescProto getIndex(String tableName, String columnName)
+ throws IOException {
+ return indexesByColumn.get(tableName+"."+columnName);
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#existIndex(java.lang.String)
+ */
+ @Override
+ public boolean existIndex(String indexName) throws IOException {
+ return indexes.containsKey(indexName);
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#existIndex(java.lang.String, java.lang.String)
+ */
+ @Override
+ public boolean existIndex(String tableName, String columnName)
+ throws IOException {
+ return indexesByColumn.containsKey(tableName + "." + columnName);
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#getIndexes(java.lang.String)
+ */
+ @Override
+ public IndexDescProto[] getIndexes(String tableName) throws IOException {
+ List<IndexDescProto> protos = new ArrayList<IndexDescProto>();
+ for (IndexDescProto proto : indexesByColumn.values()) {
+ if (proto.getTableId().equals(tableName)) {
+ protos.add(proto);
+ }
+ }
+ return protos.toArray(new IndexDescProto[protos.size()]);
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#addFunction(FunctionDesc)
+ */
+ @Override
+ public void addFunction(FunctionDesc func) throws IOException {
+ // to be implemented
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#deleteFunction(FunctionDesc)
+ */
+ @Override
+ public void deleteFunction(FunctionDesc func) throws IOException {
+ // to be implemented
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#existFunction(FunctionDesc)
+ */
+ @Override
+ public void existFunction(FunctionDesc func) throws IOException {
+ // to be implemented
+ }
+
+ /* (non-Javadoc)
+ * @see CatalogStore#getAllFunctionNames()
+ */
+ @Override
+ public List<String> getAllFunctionNames() throws IOException {
+ // to be implemented
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java
deleted file mode 100644
index 8587b1e..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/CatalogServer.java
+++ /dev/null
@@ -1,470 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.service.AbstractService;
-import tajo.catalog.CatalogProtocol.CatalogProtocolService;
-import tajo.catalog.exception.*;
-import tajo.catalog.proto.CatalogProtos.*;
-import tajo.catalog.store.CatalogStore;
-import tajo.catalog.store.DBStore;
-import tajo.common.TajoDataTypes.DataType;
-import tajo.conf.TajoConf;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.rpc.ProtoBlockingRpcServer;
-import tajo.rpc.protocolrecords.PrimitiveProtos.BoolProto;
-import tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
-import tajo.util.NetUtils;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.net.InetSocketAddress;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * This class provides the catalog service. The catalog service enables clients
- * to register, unregister and access information about tables, functions, and
- * cluster information.
- */
-public class CatalogServer extends AbstractService {
-
- private final static Log LOG = LogFactory.getLog(CatalogServer.class);
- private TajoConf conf;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final Lock rlock = lock.readLock();
- private final Lock wlock = lock.writeLock();
-
- private CatalogStore store;
-
- private Map<String, FunctionDescProto> functions = new HashMap<String, FunctionDescProto>();
-
- // RPC variables
- private ProtoBlockingRpcServer rpcServer;
- private InetSocketAddress bindAddress;
- private String serverName;
- final CatalogProtocolHandler handler;
-
- // Server status variables
- private volatile boolean stopped = false;
- @SuppressWarnings("unused")
- private volatile boolean isOnline = false;
-
- private static BoolProto BOOL_TRUE = BoolProto.newBuilder().
- setValue(true).build();
- private static BoolProto BOOL_FALSE = BoolProto.newBuilder().
- setValue(false).build();
-
- private List<FunctionDesc> builtingFuncs;
-
- public CatalogServer() throws IOException {
- super(CatalogServer.class.getName());
- this.handler = new CatalogProtocolHandler();
- this.builtingFuncs = new ArrayList<FunctionDesc>();
- }
-
- public CatalogServer(List<FunctionDesc> sqlFuncs) throws IOException {
- this();
- this.builtingFuncs = sqlFuncs;
- }
-
- @Override
- public void init(Configuration _conf) {
- this.conf = (TajoConf) _conf;
-
- Constructor<?> cons;
- try {
- Class<?> storeClass =
- this.conf.getClass(CatalogConstants.STORE_CLASS, DBStore.class);
- LOG.info("Catalog Store Class: " + storeClass.getCanonicalName());
-
- cons = storeClass.
- getConstructor(new Class [] {Configuration.class});
-
- this.store = (CatalogStore) cons.newInstance(this.conf);
-
- initBuiltinFunctions(builtingFuncs);
- } catch (Throwable t) {
- LOG.error("cannot initialize CatalogServer", t);
- }
-
- super.init(conf);
- }
-
- private void initBuiltinFunctions(List<FunctionDesc> functions)
- throws ServiceException {
- for (FunctionDesc desc : functions) {
- handler.registerFunction(null, desc.getProto());
- }
- }
-
- public void start() {
- // Server to handle client requests.
- String serverAddr = conf.getVar(ConfVars.CATALOG_ADDRESS);
- // Creation of a HSA will force a resolve.
- InetSocketAddress initIsa = NetUtils.createSocketAddr(serverAddr);
- try {
- this.rpcServer = new ProtoBlockingRpcServer(
- CatalogProtocol.class,
- handler, initIsa);
- this.rpcServer.start();
-
- this.bindAddress = this.rpcServer.getBindAddress();
- this.serverName = tajo.util.NetUtils.getIpPortString(bindAddress);
- conf.setVar(ConfVars.CATALOG_ADDRESS, serverName);
- } catch (Exception e) {
- LOG.error("Cannot start RPC Server of CatalogServer", e);
- }
-
- LOG.info("Catalog Server startup (" + serverName + ")");
- super.start();
- }
-
- public void stop() {
- this.rpcServer.shutdown();
- LOG.info("Catalog Server (" + serverName + ") shutdown");
- super.stop();
- }
-
- public CatalogProtocolHandler getHandler() {
- return this.handler;
- }
-
- public InetSocketAddress getBindAddress() {
- return this.bindAddress;
- }
-
- public class CatalogProtocolHandler
- implements CatalogProtocolService.BlockingInterface {
-
- @Override
- public TableDescProto getTableDesc(RpcController controller,
- StringProto name)
- throws ServiceException {
- rlock.lock();
- try {
- String tableId = name.getValue().toLowerCase();
- if (!store.existTable(tableId)) {
- throw new NoSuchTableException(tableId);
- }
- return (TableDescProto) store.getTable(tableId).getProto();
- } catch (IOException ioe) {
- // TODO - handle exception
- LOG.error(ioe);
- return null;
- } finally {
- rlock.unlock();
- }
- }
-
- @Override
- public GetAllTableNamesResponse getAllTableNames(RpcController controller,
- NullProto request)
- throws ServiceException {
- try {
- Iterator<String> iterator = store.getAllTableNames().iterator();
- GetAllTableNamesResponse.Builder builder =
- GetAllTableNamesResponse.newBuilder();
- while (iterator.hasNext()) {
- builder.addTableName(iterator.next());
- }
- return builder.build();
- } catch (IOException ioe) {
- // TODO - handle exception
- return null;
- }
- }
-
- @Override
- public GetFunctionsResponse getFunctions(RpcController controller,
- NullProto request)
- throws ServiceException {
- Iterator<FunctionDescProto> iterator = functions.values().iterator();
- GetFunctionsResponse.Builder builder = GetFunctionsResponse.newBuilder();
- while (iterator.hasNext()) {
- builder.addFunctionDesc(iterator.next());
- }
- return builder.build();
- }
-
- @Override
- public BoolProto addTable(RpcController controller, TableDescProto tableDesc)
- throws ServiceException {
- Preconditions.checkArgument(tableDesc.hasId(),
- "Must be set to the table name");
- Preconditions.checkArgument(tableDesc.hasPath(),
- "Must be set to the table URI");
-
- wlock.lock();
- try {
- if (store.existTable(tableDesc.getId())) {
- throw new AlreadyExistsTableException(tableDesc.getId());
- }
-
- // rewrite schema
- SchemaProto revisedSchema =
- CatalogUtil.getQualfiedSchema(tableDesc.getId(), tableDesc.getMeta()
- .getSchema());
-
- TableProto.Builder metaBuilder = TableProto.newBuilder(tableDesc.getMeta());
- metaBuilder.setSchema(revisedSchema);
- TableDescProto.Builder descBuilder = TableDescProto.newBuilder(tableDesc);
- descBuilder.setMeta(metaBuilder.build());
-
- store.addTable(new TableDescImpl(descBuilder.build()));
-
- } catch (IOException ioe) {
- LOG.error(ioe);
- } finally {
- wlock.unlock();
- LOG.info("Table " + tableDesc.getId() + " is added to the catalog ("
- + serverName + ")");
- }
-
- return BOOL_TRUE;
- }
-
- @Override
- public BoolProto deleteTable(RpcController controller, StringProto name)
- throws ServiceException {
- wlock.lock();
- try {
- String tableId = name.getValue().toLowerCase();
- if (!store.existTable(tableId)) {
- throw new NoSuchTableException(tableId);
- }
- store.deleteTable(tableId);
- } catch (IOException ioe) {
- LOG.error(ioe);
- } finally {
- wlock.unlock();
- }
-
- return BOOL_TRUE;
- }
-
- @Override
- public BoolProto existsTable(RpcController controller, StringProto name)
- throws ServiceException {
- try {
- String tableId = name.getValue().toLowerCase();
- if (store.existTable(tableId)) {
- return BOOL_TRUE;
- } else {
- return BOOL_FALSE;
- }
- } catch (IOException e) {
- LOG.error(e);
- throw new ServiceException(e);
- }
- }
-
- @Override
- public BoolProto addIndex(RpcController controller, IndexDescProto indexDesc)
- throws ServiceException {
- rlock.lock();
- try {
- if (store.existIndex(indexDesc.getName())) {
- throw new AlreadyExistsIndexException(indexDesc.getName());
- }
- store.addIndex(indexDesc);
- } catch (IOException ioe) {
- LOG.error("ERROR : cannot add index " + indexDesc.getName(), ioe);
- LOG.error(indexDesc);
- throw new ServiceException(ioe);
- } finally {
- rlock.unlock();
- }
-
- return BOOL_TRUE;
- }
-
- @Override
- public BoolProto existIndexByName(RpcController controller,
- StringProto indexName)
- throws ServiceException {
- rlock.lock();
- try {
- return BoolProto.newBuilder().setValue(
- store.existIndex(indexName.getValue())).build();
- } catch (IOException e) {
- LOG.error(e);
- return BoolProto.newBuilder().setValue(false).build();
- } finally {
- rlock.unlock();
- }
- }
-
- @Override
- public BoolProto existIndex(RpcController controller,
- GetIndexRequest request)
- throws ServiceException {
- rlock.lock();
- try {
- return BoolProto.newBuilder().setValue(
- store.existIndex(request.getTableName(),
- request.getColumnName())).build();
- } catch (IOException e) {
- LOG.error(e);
- return BoolProto.newBuilder().setValue(false).build();
- } finally {
- rlock.unlock();
- }
- }
-
- @Override
- public IndexDescProto getIndexByName(RpcController controller,
- StringProto indexName)
- throws ServiceException {
- rlock.lock();
- try {
- if (!store.existIndex(indexName.getValue())) {
- throw new NoSuchIndexException(indexName.getValue());
- }
- return store.getIndex(indexName.getValue());
- } catch (IOException ioe) {
- LOG.error("ERROR : cannot get index " + indexName, ioe);
- return null;
- } finally {
- rlock.unlock();
- }
- }
-
- @Override
- public IndexDescProto getIndex(RpcController controller,
- GetIndexRequest request)
- throws ServiceException {
- rlock.lock();
- try {
- if (!store.existIndex(request.getTableName())) {
- throw new NoSuchIndexException(request.getTableName() + "."
- + request.getColumnName());
- }
- return store.getIndex(request.getTableName(), request.getColumnName());
- } catch (IOException ioe) {
- LOG.error("ERROR : cannot get index " + request.getTableName() + "."
- + request.getColumnName(), ioe);
- return null;
- } finally {
- rlock.unlock();
- }
- }
-
- @Override
- public BoolProto delIndex(RpcController controller, StringProto indexName)
- throws ServiceException {
- wlock.lock();
- try {
- if (!store.existIndex(indexName.getValue())) {
- throw new NoSuchIndexException(indexName.getValue());
- }
- store.delIndex(indexName.getValue());
- } catch (IOException e) {
- LOG.error(e);
- } finally {
- wlock.unlock();
- }
-
- return BOOL_TRUE;
- }
-
- @Override
- public BoolProto registerFunction(RpcController controller,
- FunctionDescProto funcDesc)
- throws ServiceException {
- String canonicalName =
- CatalogUtil.getCanonicalName(funcDesc.getSignature(),
- funcDesc.getParameterTypesList());
- if (functions.containsKey(canonicalName)) {
- throw new AlreadyExistsFunctionException(canonicalName);
- }
-
- functions.put(canonicalName, funcDesc);
- if (LOG.isDebugEnabled()) {
- LOG.info("Function " + canonicalName + " is registered.");
- }
-
- return BOOL_TRUE;
- }
-
- @Override
- public BoolProto unregisterFunction(RpcController controller,
- UnregisterFunctionRequest request)
- throws ServiceException {
- String signature = request.getSignature();
- List<DataType> paramTypes = new ArrayList<DataType>();
- int size = request.getParameterTypesCount();
- for (int i = 0; i < size; i++) {
- paramTypes.add(request.getParameterTypes(i));
- }
- String canonicalName = CatalogUtil.getCanonicalName(signature, paramTypes);
- if (!functions.containsKey(canonicalName)) {
- throw new NoSuchFunctionException(canonicalName);
- }
-
- functions.remove(canonicalName);
- LOG.info("GeneralFunction " + canonicalName + " is unregistered.");
-
- return BOOL_TRUE;
- }
-
- @Override
- public FunctionDescProto getFunctionMeta(RpcController controller,
- GetFunctionMetaRequest request)
- throws ServiceException {
- List<DataType> paramTypes = new ArrayList<DataType>();
- int size = request.getParameterTypesCount();
- for (int i = 0; i < size; i++) {
- paramTypes.add(request.getParameterTypes(i));
- }
- return functions.get(CatalogUtil.getCanonicalName(
- request.getSignature().toLowerCase(), paramTypes));
- }
-
- @Override
- public BoolProto containFunction(RpcController controller,
- ContainFunctionRequest request)
- throws ServiceException {
- List<DataType> paramTypes = new ArrayList<DataType>();
- int size = request.getParameterTypesCount();
- for (int i = 0; i < size; i++) {
- paramTypes.add(request.getParameterTypes(i));
- }
- boolean returnValue =
- functions.containsKey(CatalogUtil.getCanonicalName(
- request.getSignature().toLowerCase(), paramTypes));
- return BoolProto.newBuilder().setValue(returnValue).build();
- }
- }
-
- public static void main(String[] args) throws Exception {
- TajoConf conf = new TajoConf();
- CatalogServer catalog = new CatalogServer(new ArrayList<FunctionDesc>());
- catalog.init(conf);
- catalog.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java
deleted file mode 100644
index 6856736..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/LocalCatalog.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- *
- */
-package tajo.catalog;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import tajo.conf.TajoConf;
-
-import java.io.IOException;
-
-/**
- * This class provides a catalog service interface in
- * local.
- */
-public class LocalCatalog extends AbstractCatalogClient {
- private static final Log LOG = LogFactory.getLog(LocalCatalog.class);
- private CatalogServer catalog;
-
- public LocalCatalog(final TajoConf conf) throws IOException {
- this.catalog = new CatalogServer();
- this.catalog.init(conf);
- this.catalog.start();
- setStub(catalog.getHandler());
- }
-
- public LocalCatalog(final CatalogServer server) {
- this.catalog = server;
- setStub(server.getHandler());
- }
-
- public void shutdown() {
- this.catalog.stop();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java
deleted file mode 100644
index 5117fe4..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/MiniCatalogServer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog;
-
-import tajo.conf.TajoConf;
-
-import java.io.IOException;
-
-public class MiniCatalogServer {
- private CatalogServer catalogServers;
-
- public MiniCatalogServer(TajoConf conf) throws IOException {
- catalogServers = new CatalogServer();
- catalogServers.init(conf);
- catalogServers.start();
- }
-
- public void shutdown() {
- this.catalogServers.stop();
- }
-
- public CatalogServer getCatalogServer() {
- return this.catalogServers;
- }
-
- public CatalogService getCatalog() {
- return new LocalCatalog(this.catalogServers);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java
deleted file mode 100644
index c4c7eaa..0000000
--- a/tajo-catalog/tajo-catalog-server/src/main/java/tajo/catalog/store/CatalogStore.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.catalog.store;
-
-import tajo.catalog.FunctionDesc;
-import tajo.catalog.TableDesc;
-import tajo.catalog.proto.CatalogProtos.IndexDescProto;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-public interface CatalogStore extends Closeable {
- void addTable(TableDesc desc) throws IOException;
-
- boolean existTable(String name) throws IOException;
-
- void deleteTable(String name) throws IOException;
-
- TableDesc getTable(String name) throws IOException;
-
- List<String> getAllTableNames() throws IOException;
-
- void addIndex(IndexDescProto proto) throws IOException;
-
- void delIndex(String indexName) throws IOException;
-
- IndexDescProto getIndex(String indexName) throws IOException;
-
- IndexDescProto getIndex(String tableName, String columnName)
- throws IOException;
-
- boolean existIndex(String indexName) throws IOException;
-
- boolean existIndex(String tableName, String columnName) throws IOException;
-
- IndexDescProto [] getIndexes(String tableName) throws IOException;
-
- void addFunction(FunctionDesc func) throws IOException;
-
- void deleteFunction(FunctionDesc func) throws IOException;
-
- void existFunction(FunctionDesc func) throws IOException;
-
- List<String> getAllFunctionNames() throws IOException;
-}