You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/09 02:56:23 UTC
[02/10] tajo git commit: TAJO-1213: Implement
CatalogStore::updateTableStats. (jaehwa)
TAJO-1213: Implement CatalogStore::updateTableStats. (jaehwa)
Closes #285
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/95cf4b94
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/95cf4b94
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/95cf4b94
Branch: refs/heads/index_support
Commit: 95cf4b9432a02fdbf9880b204c3db718e2bd2468
Parents: ab2efce
Author: JaeHwa Jung <bl...@apache.org>
Authored: Sun Dec 7 15:38:43 2014 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Sun Dec 7 15:40:24 2014 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/catalog/AbstractCatalogClient.java | 14 ++++
.../src/main/proto/CatalogProtocol.proto | 1 +
.../org/apache/tajo/catalog/CatalogService.java | 5 ++
.../src/main/proto/CatalogProtos.proto | 5 ++
.../tajo/catalog/store/HCatalogStore.java | 6 ++
.../org/apache/tajo/catalog/CatalogServer.java | 22 +++++++
.../tajo/catalog/store/AbstractDBStore.java | 68 ++++++++++++++++++++
.../apache/tajo/catalog/store/CatalogStore.java | 4 +-
.../org/apache/tajo/catalog/store/MemStore.java | 17 +++++
.../org/apache/tajo/master/GlobalEngine.java | 8 ++-
.../apache/tajo/master/querymaster/Query.java | 9 ++-
12 files changed, 156 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8ecd795..a59e107 100644
--- a/CHANGES
+++ b/CHANGES
@@ -18,6 +18,8 @@ Release 0.9.1 - unreleased
IMPROVEMENT
+ TAJO-1213: Implement CatalogStore::updateTableStats. (jaehwa)
+
TAJO-1165: Needs to show error messages on query_executor.jsp.
(Jihun Kang via jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 1f1e808..dde6980 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -658,4 +658,18 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
}
+ @Override
+ public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) {
+ try {
+ return new ServerCallable<Boolean>(pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+ public Boolean call(NettyClientBase client) throws ServiceException {
+ CatalogProtocolService.BlockingInterface stub = getStub(client);
+ return stub.updateTableStats(null, updateTableStatsProto).getValue();
+ }
+ }.withRetries();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return false;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
index c5cb528..adf0740 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
+++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
@@ -33,6 +33,7 @@ service CatalogProtocolService {
rpc getTablespace(StringProto) returns (TablespaceProto);
rpc alterTablespace(AlterTablespaceProto) returns (BoolProto);
rpc alterTable(AlterTableDescProto) returns (BoolProto);
+ rpc updateTableStats(UpdateTableStatsProto) returns (BoolProto);
rpc createDatabase(CreateDatabaseRequest) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
index 667ee88..b41b636 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -27,6 +27,8 @@ import java.util.Collection;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
import static org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
+
public interface CatalogService {
@@ -181,4 +183,7 @@ public interface CatalogService {
* @throws Throwable
*/
boolean alterTable(AlterTableDesc desc);
+
+ boolean updateTableStats(UpdateTableStatsProto stats);
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index f29bc6c..22c08d8 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -280,6 +280,11 @@ message AlterColumnProto {
required string newColumnName = 2;
}
+message UpdateTableStatsProto {
+ required string tableName = 1;
+ required TableStatsProto stats = 2;
+}
+
////////////////////////////////////////////////
// Function and UDF Section
////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
index fa1cfd6..ad0aee3 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
@@ -333,6 +333,12 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
}
@Override
+ public void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws
+ CatalogException {
+ // TODO - not implemented yet
+ }
+
+ @Override
public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) throws CatalogException {
throw new CatalogException("tablespace concept is not supported in HCatalogStore");
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index 03ae920..57086e2 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -58,6 +58,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
/**
* This class provides the catalog service. The catalog service enables clients
@@ -366,6 +367,27 @@ public class CatalogServer extends AbstractService {
}
@Override
+ public BoolProto updateTableStats(RpcController controller, UpdateTableStatsProto proto) throws
+ ServiceException {
+ wlock.lock();
+ try {
+ String [] split = CatalogUtil.splitTableName(proto.getTableName());
+ if (!store.existTable(split[0], split[1])) {
+ throw new NoSuchTableException(proto.getTableName());
+ }
+ store.updateTableStats(proto);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ return BOOL_FALSE;
+ } finally {
+ wlock.unlock();
+ LOG.info("Table " + proto.getTableName() + " is updated in the catalog ("
+ + bindAddressStr + ")");
+ }
+ return BOOL_TRUE;
+ }
+
+ @Override
public BoolProto alterTable(RpcController controller, AlterTableDescProto proto) throws ServiceException {
wlock.lock();
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index 7c1baab..c7d55eb 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -823,6 +823,74 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
@Override
+ public void updateTableStats(final CatalogProtos.UpdateTableStatsProto statsProto) throws
+ CatalogException {
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ ResultSet res = null;
+
+ try {
+ conn = getConnection();
+ conn.setAutoCommit(false);
+
+ String[] splitted = CatalogUtil.splitTableName(statsProto.getTableName());
+ if (splitted.length == 1) {
+ throw new IllegalArgumentException("updateTableStats() requires a qualified table name, but it is \""
+ + statsProto.getTableName() + "\".");
+ }
+ String databaseName = splitted[0];
+ String tableName = splitted[1];
+
+ int dbid = getDatabaseId(databaseName);
+
+ String tidSql =
+ "SELECT TID from " + TB_TABLES + " WHERE " + COL_DATABASES_PK + "=? AND " + COL_TABLES_NAME + "=?";
+ pstmt = conn.prepareStatement(tidSql);
+ pstmt.setInt(1, dbid);
+ pstmt.setString(2, tableName);
+ res = pstmt.executeQuery();
+
+ if (!res.next()) {
+ throw new CatalogException("ERROR: there is no TID matched to " + statsProto.getTableName());
+ }
+
+ int tableId = res.getInt("TID");
+ res.close();
+ pstmt.close();
+
+ if (statsProto.hasStats()) {
+
+ String statSql = "UPDATE " + TB_STATISTICS + " SET NUM_ROWS = ?, " +
+ "NUM_BYTES = ? WHERE TID = ?";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(statSql);
+ }
+
+ pstmt = conn.prepareStatement(statSql);
+ pstmt.setInt(1, tableId);
+ pstmt.setLong(2, statsProto.getStats().getNumRows());
+ pstmt.setLong(3, statsProto.getStats().getNumBytes());
+ pstmt.executeUpdate();
+ }
+
+ // If there is no error, commit the changes.
+ conn.commit();
+ } catch (SQLException se) {
+ if (conn != null) {
+ try {
+ conn.rollback();
+ } catch (SQLException e) {
+ LOG.error(e);
+ }
+ }
+ throw new CatalogException(se);
+ } finally {
+ CatalogUtil.closeQuietly(pstmt, res);
+ }
+ }
+
+ @Override
public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException {
String[] splitted = CatalogUtil.splitTableName(alterTableDescProto.getTableName());
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
index 5de9633..041fc52 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -68,7 +68,9 @@ public interface CatalogStore extends Closeable {
void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException;
- /************************ PARTITION METHOD **************************/
+ void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws CatalogException;
+
+ /************************ PARTITION METHOD **************************/
void addPartitionMethod(PartitionMethodProto partitionMethodProto) throws CatalogException;
PartitionMethodProto getPartitionMethod(String databaseName, String tableName)
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index ca99160..9575c13 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -172,6 +172,23 @@ public class MemStore implements CatalogStore {
}
@Override
+ public void updateTableStats(CatalogProtos.UpdateTableStatsProto request) throws CatalogException {
+ String [] splitted = CatalogUtil.splitTableName(request.getTableName());
+ if (splitted.length == 1) {
+ throw new IllegalArgumentException("createTable() requires a qualified table name, but it is \""
+ + request.getTableName() + "\".");
+ }
+ String databaseName = splitted[0];
+ String tableName = splitted[1];
+
+ final Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, databaseName);
+ final CatalogProtos.TableDescProto tableDescProto = database.get(tableName);
+ CatalogProtos.TableDescProto newTableDescProto = tableDescProto.toBuilder().setStats(request
+ .getStats().toBuilder()).build();
+ database.put(tableName, newTableDescProto);
+ }
+
+ @Override
public boolean existTable(String dbName, String tbName) throws CatalogException {
Map<String, CatalogProtos.TableDescProto> database = checkAndGetDatabaseNS(databases, dbName);
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 9bf9a75..821d440 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -74,6 +74,7 @@ import java.util.List;
import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
+import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
@@ -398,8 +399,11 @@ public class GlobalEngine extends AbstractService {
stats.setNumBytes(volume);
stats.setNumRows(1);
- catalog.dropTable(insertNode.getTableName());
- catalog.createTable(tableDesc);
+ UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
+ builder.setTableName(tableDesc.getName());
+ builder.setStats(stats.getProto());
+
+ catalog.updateTableStats(builder.build());
responseBuilder.setTableDesc(tableDesc.getProto());
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/95cf4b94/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index 6f80171..f92001f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -34,6 +34,8 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.QueryState;
+import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
@@ -916,8 +918,11 @@ public class Query implements EventHandler<QueryEvent> {
finalTable.setStats(stats);
if (insertNode.hasTargetTable()) {
- catalog.dropTable(insertNode.getTableName());
- catalog.createTable(finalTable);
+ UpdateTableStatsProto.Builder builder = UpdateTableStatsProto.newBuilder();
+ builder.setTableName(finalTable.getName());
+ builder.setStats(stats.getProto());
+
+ catalog.updateTableStats(builder.build());
}
query.setResultDesc(finalTable);