You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/03/27 04:01:05 UTC
[4/6] tajo git commit: TAJO-1284: Add alter partition method to
CatalogStore. (jaehwa)
TAJO-1284: Add alter partition method to CatalogStore. (jaehwa)
Closes #448
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cad54428
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cad54428
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cad54428
Branch: refs/heads/index_support
Commit: cad54428dd803cdb8caf3d51a9ba7e1d83a99b01
Parents: a01292f
Author: JaeHwa Jung <bl...@apache.org>
Authored: Fri Mar 27 11:01:32 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Fri Mar 27 11:01:32 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/catalog/AbstractCatalogClient.java | 45 ++-
.../src/main/proto/CatalogProtocol.proto | 7 +-
.../org/apache/tajo/catalog/AlterTableDesc.java | 21 +-
.../org/apache/tajo/catalog/AlterTableType.java | 2 +-
.../apache/tajo/catalog/CatalogConstants.java | 5 +
.../org/apache/tajo/catalog/CatalogService.java | 9 +-
.../AlreadyExistsPartitionException.java | 33 ++
.../exception/NoSuchPartitionException.java | 39 +++
.../tajo/catalog/partition/PartitionDesc.java | 97 +++---
.../tajo/catalog/partition/PartitionKey.java | 147 ++++++++
.../src/main/proto/CatalogProtos.proto | 30 +-
.../tajo/catalog/store/HCatalogStore.java | 125 +++++--
.../tajo/catalog/store/TestHCatalogStore.java | 77 ++++-
.../org/apache/tajo/catalog/CatalogServer.java | 100 ++++--
.../InfoSchemaMetadataDictionary.java | 2 +
.../PartitionKeysTableDescriptor.java | 46 +++
.../dictionary/PartitionsTableDescriptor.java | 3 +-
.../tajo/catalog/store/AbstractDBStore.java | 337 +++++++++++++------
.../store/AbstractMySQLMariaDBStore.java | 14 +
.../apache/tajo/catalog/store/CatalogStore.java | 14 +-
.../org/apache/tajo/catalog/store/MemStore.java | 109 ++++--
.../src/main/resources/schemas/derby/derby.xml | 27 +-
.../schemas/mariadb/partition_keys.sql | 6 +
.../resources/schemas/mariadb/partitions.sql | 13 +-
.../resources/schemas/mysql/partition_keys.sql | 6 +
.../main/resources/schemas/mysql/partitions.sql | 13 +-
.../main/resources/schemas/oracle/oracle.xml | 45 ++-
.../resources/schemas/postgresql/postgresql.xml | 16 +-
.../org/apache/tajo/catalog/TestCatalog.java | 73 +++-
.../NonForwardQueryResultSystemScanner.java | 6 +-
.../tajo/jdbc/util/TestQueryStringDecoder.java | 14 +-
32 files changed, 1180 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 575a389..029735f 100644
--- a/CHANGES
+++ b/CHANGES
@@ -108,6 +108,8 @@ Release 0.11.0 - unreleased
SUB TASKS
+ TAJO-1284: Add alter partition method to CatalogStore. (jaehwa)
+
TAJO-1392: Resolve findbug warnings on Tajo Plan Module. (jihun)
TAJO-1393: Resolve findbug warnings on Tajo Cli Module.
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index d8350a3..458d6e0 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -410,7 +410,50 @@ public abstract class AbstractCatalogClient implements CatalogService {
return false;
}
}
-
+
+ @Override
+ public final PartitionDescProto getPartition(final String databaseName, final String tableName,
+ final String partitionName) {
+ try {
+ return new ServerCallable<PartitionDescProto>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) {
+ public PartitionDescProto call(NettyClientBase client) throws ServiceException {
+
+ PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
+ builder.setPartitionName(partitionName);
+
+ CatalogProtocolService.BlockingInterface stub = getStub(client);
+ return stub.getPartitionByPartitionName(null, builder.build());
+ }
+ }.withRetries();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) {
+ try {
+ return new ServerCallable<List<PartitionDescProto>>(this.pool, getCatalogServerAddr(), CatalogProtocol.class,
+ false) {
+ public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException {
+
+ PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
+
+ CatalogProtocolService.BlockingInterface stub = getStub(client);
+ PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
+ return response.getPartitionList();
+ }
+ }.withRetries();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
@Override
public List<TablePartitionProto> getAllPartitions() {
try {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
index cae5d88..5ace32e 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
+++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto
@@ -56,11 +56,8 @@ service CatalogProtocolService {
rpc existPartitionMethod(TableIdentifierProto) returns (BoolProto);
rpc dropPartitionMethod(TableIdentifierProto) returns (BoolProto);
- rpc addPartitions(PartitionsProto) returns (BoolProto);
- rpc addPartition(PartitionDescProto) returns (BoolProto);
- rpc getPartitionByPartitionName(StringProto) returns (PartitionDescProto);
- rpc getPartitionsByTableName(StringProto) returns (PartitionsProto);
- rpc delAllPartitions(StringProto) returns (PartitionsProto);
+ rpc getPartitionByPartitionName(PartitionIdentifierProto) returns (PartitionDescProto);
+ rpc getPartitionsByTableName(PartitionIdentifierProto) returns (PartitionsProto);
rpc getAllPartitions(NullProto) returns (GetTablePartitionsProto);
rpc createIndex(IndexDescProto) returns (BoolProto);
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java
index 69d5be4..f1265fb 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java
@@ -21,6 +21,7 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.json.GsonObject;
@@ -40,7 +41,9 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj
@Expose
protected String newColumnName; //optional
@Expose
- protected Column addColumn = null; //optiona
+ protected Column addColumn = null; //optional
+ @Expose
+ protected PartitionDesc partitionDesc; //optional
public AlterTableDesc() {
}
@@ -94,6 +97,10 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj
this.alterTableType = alterTableType;
}
+ public PartitionDesc getPartitionDesc() { return partitionDesc; }
+
+ public void setPartitionDesc(PartitionDesc partitionDesc) { this.partitionDesc = partitionDesc; }
+
@Override
public String toString() {
Gson gson = new GsonBuilder().setPrettyPrinting().
@@ -109,6 +116,7 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj
newAlter.newTableName = newTableName;
newAlter.columnName = newColumnName;
newAlter.addColumn = addColumn;
+ newAlter.partitionDesc = partitionDesc;
return newAlter;
}
@@ -147,8 +155,19 @@ public class AlterTableDesc implements ProtoObject<AlterTableDescProto>, GsonObj
case ADD_COLUMN:
builder.setAlterTableType(CatalogProtos.AlterTableType.ADD_COLUMN);
break;
+ case ADD_PARTITION:
+ builder.setAlterTableType(CatalogProtos.AlterTableType.ADD_PARTITION);
+ break;
+ case DROP_PARTITION:
+ builder.setAlterTableType(CatalogProtos.AlterTableType.DROP_PARTITION);
+ break;
default:
}
+
+ if (null != this.partitionDesc) {
+ builder.setPartitionDesc(partitionDesc.getProto());
+ }
+
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java
index 0b7639c..7e3be91 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java
@@ -18,5 +18,5 @@
package org.apache.tajo.catalog;
public enum AlterTableType {
- RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN
+ RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
index a8c5c9b..8265e38 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java
@@ -48,11 +48,16 @@ public class CatalogConstants {
public static final String TB_STATISTICS = "STATS";
public static final String TB_PARTITION_METHODS = "PARTITION_METHODS";
public static final String TB_PARTTIONS = "PARTITIONS";
+ public static final String TB_PARTTION_KEYS = "PARTITION_KEYS";
public static final String COL_TABLESPACE_PK = "SPACE_ID";
public static final String COL_DATABASES_PK = "DB_ID";
public static final String COL_TABLES_PK = "TID";
public static final String COL_TABLES_NAME = "TABLE_NAME";
+
+ public static final String COL_PARTITIONS_PK = "PARTITION_ID";
+ public static final String COL_COLUMN_NAME = "COLUMN_NAME";
+ public static final String COL_PARTITION_VALUE = "PARTITION_VALUE";
public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema";
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
index 2a5d890..86b773b 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java
@@ -19,6 +19,7 @@
package org.apache.tajo.catalog;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto;
@@ -183,7 +184,11 @@ public interface CatalogService {
PartitionMethodDesc getPartitionMethod(String databaseName, String tableName);
boolean existPartitionMethod(String databaseName, String tableName);
-
+
+ CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, String partitionName);
+
+ List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, String tableName);
+
List<TablePartitionProto> getAllPartitions();
boolean createIndex(IndexDesc index);
@@ -221,4 +226,6 @@ public interface CatalogService {
boolean updateTableStats(UpdateTableStatsProto stats);
+
+
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java
new file mode 100644
index 0000000..ab6144f
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.exception;
+
+public class AlreadyExistsPartitionException extends RuntimeException {
+
+ private static final long serialVersionUID = 277182608283894930L;
+
+ public AlreadyExistsPartitionException(String message) {
+ super(message);
+ }
+
+ public AlreadyExistsPartitionException(String databaseName, String tableName, String partitionName) {
+ super(String.format("ERROR: \"%s already exist in \"%s.%s\"", partitionName, databaseName, tableName));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java
new file mode 100644
index 0000000..45c9299
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.exception;
+
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.function.FunctionUtil;
+import org.codehaus.jackson.schema.JsonSerializableSchema;
+
+import java.util.Collection;
+
+public class NoSuchPartitionException extends RuntimeException {
+
+ private static final long serialVersionUID = 277182608283894938L;
+
+ public NoSuchPartitionException(String message) {
+ super(message);
+ }
+
+ public NoSuchPartitionException(String databaseName, String tableName, String partitionName) {
+ super(String.format("ERROR: \"%s\" does not exist in \"%s.%s\".", partitionName, databaseName, tableName));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
index d775ba8..b6d883d 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java
@@ -19,20 +19,44 @@
package org.apache.tajo.catalog.partition;
import com.google.common.base.Objects;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.json.GsonObject;
+import java.util.ArrayList;
+import java.util.List;
+
/**
- * <code>PartitionDesc</code> presents a table partition.
+ * This presents each partitions of column partitioned table.
+ * Each partitions can have a own name, partition path, colum name and partition value pairs.
+ *
+ * For example, consider you have a partitioned table as follows:
+ *
+ * create external table table1 (id text, name text) PARTITION BY COLUMN (dt text, phone text,
+ * gender text) USING RCFILE LOCATION '/tajo/data/table1';
+ *
+ * Then, its data will be stored on HDFS as follows:
+ * - /tajo/data/table1/dt=20150301/phone=1300/gender=m
+ * - /tajo/data/table1/dt=20150301/phone=1300/gender=f
+ * - /tajo/data/table1/dt=20150302/phone=1500/gender=m
+ * - /tajo/data/table1/dt=20150302/phone=1500/gender=f
+ *
+ * In such as above, first directory can be presented with this class as follows:
+ * - partitionName : dt=20150301/phone=1300/gender=m
+ * - path: /tajo/data/table1/dt=20150301/phone=1300/gender=m
+ * - partitionKeys:
+ * dt=20150301, phone=1300, gender=m
+ *
*/
public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescProto>, Cloneable, GsonObject {
- @Expose protected String partitionName; // optional
- @Expose protected int ordinalPosition; // required
- @Expose protected String partitionValue; // optional
- @Expose protected String path; // optional
+ @Expose protected String partitionName;
+ @Expose protected List<PartitionKey> partitionKeys;
+ @Expose protected String path; //optional
private CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
@@ -41,8 +65,7 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
public PartitionDesc(PartitionDesc partition) {
this.partitionName = partition.partitionName;
- this.ordinalPosition = partition.ordinalPosition;
- this.partitionValue = partition.partitionValue;
+ this.partitionKeys = partition.partitionKeys;
this.path = partition.path;
}
@@ -50,46 +73,44 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
if(proto.hasPartitionName()) {
this.partitionName = proto.getPartitionName();
}
- this.ordinalPosition = proto.getOrdinalPosition();
- if(proto.hasPartitionValue()) {
- this.partitionValue = proto.getPartitionValue();
+
+ this.partitionKeys = new ArrayList<PartitionKey>();
+ for(CatalogProtos.PartitionKeyProto keyProto : proto.getPartitionKeysList()) {
+ PartitionKey partitionKey = new PartitionKey(keyProto);
+ this.partitionKeys.add(partitionKey);
}
+
if(proto.hasPath()) {
this.path = proto.getPath();
}
}
- public void setName(String partitionName) {
- this.partitionName = partitionName;
- }
- public String getName() {
+ public String getPartitionName() {
return partitionName;
}
-
- public void setOrdinalPosition(int ordinalPosition) {
- this.ordinalPosition = ordinalPosition;
- }
- public int getOrdinalPosition() {
- return ordinalPosition;
+ public void setPartitionName(String partitionName) {
+ this.partitionName = partitionName;
}
- public void setPartitionValue(String partitionValue) {
- this.partitionValue = partitionValue;
+ public List<PartitionKey> getPartitionKeys() {
+ return partitionKeys;
}
- public String getPartitionValue() {
- return partitionValue;
+
+ public void setPartitionKeys(List<PartitionKey> partitionKeys) {
+ this.partitionKeys = partitionKeys;
}
public void setPath(String path) {
this.path = path;
}
+
public String getPath() {
return path;
}
public int hashCode() {
- return Objects.hashCode(partitionName, ordinalPosition, partitionValue, path);
+ return Objects.hashCode(partitionName, partitionKeys, path);
}
public boolean equals(Object o) {
@@ -98,10 +119,9 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
boolean eq = ((partitionName != null && another.partitionName != null
&& partitionName.equals(another.partitionName)) ||
(partitionName == null && another.partitionName == null));
- eq = eq && (ordinalPosition == another.ordinalPosition);
- eq = eq && ((partitionValue != null && another.partitionValue != null
- && partitionValue.equals(another.partitionValue))
- || (partitionValue == null && another.partitionValue == null));
+ eq = eq && ((partitionKeys != null && another.partitionKeys != null
+ && partitionKeys.equals(another.partitionKeys))
+ || (partitionKeys == null && another.partitionKeys == null));
eq = eq && ((path != null && another.path != null && path.equals(another.path)) ||
(path == null && another.path == null));
return eq;
@@ -117,13 +137,14 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
}
if(this.partitionName != null) {
- builder.setPartitionName(partitionName);
+ builder.setPartitionName(this.partitionName);
}
- builder.setOrdinalPosition(this.ordinalPosition);
-
- if (this.partitionValue != null) {
- builder.setPartitionValue(this.partitionValue);
+ builder.clearPartitionKeys();
+ if (this.partitionKeys != null) {
+ for(PartitionKey partitionKey : this.partitionKeys) {
+ builder.addPartitionKeys(partitionKey.getProto());
+ }
}
if(this.path != null) {
@@ -134,8 +155,9 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
}
public String toString() {
- StringBuilder sb = new StringBuilder("name: " + partitionName);
- return sb.toString();
+ Gson gson = new GsonBuilder().setPrettyPrinting().
+ excludeFieldsWithoutExposeAnnotation().create();
+ return gson.toJson(this);
}
@Override
@@ -151,8 +173,7 @@ public class PartitionDesc implements ProtoObject<CatalogProtos.PartitionDescPro
PartitionDesc desc = (PartitionDesc) super.clone();
desc.builder = CatalogProtos.PartitionDescProto.newBuilder();
desc.partitionName = partitionName;
- desc.ordinalPosition = ordinalPosition;
- desc.partitionValue = partitionValue;
+ desc.partitionKeys = partitionKeys;
desc.path = path;
return desc;
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java
new file mode 100644
index 0000000..085598b
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.partition;
+
+import com.google.common.base.Objects;
+import com.google.gson.annotations.Expose;
+import org.apache.tajo.catalog.json.CatalogGsonHelper;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.json.GsonObject;
+import org.apache.tajo.util.TUtil;
+
+
+/**
+ * This presents column name and partition value pairs of column partitioned table.
+ *
+ * For example, consider you have a partitioned table as follows:
+ *
+ * create external table table1 (id text, name text) PARTITION BY COLUMN (dt text, phone text,
+ * gender text) USING RCFILE LOCATION '/tajo/data/table1';
+ *
+ * Then, its data will be stored on HDFS as follows:
+ * - /tajo/data/table1/dt=20150301/phone=1300/gender=m
+ * - /tajo/data/table1/dt=20150301/phone=1300/gender=f
+ * - /tajo/data/table1/dt=20150302/phone=1500/gender=m
+ * - /tajo/data/table1/dt=20150302/phone=1500/gender=f
+ *
+ * In such as above, first directory can be presented with this class as follows:
+ * The first pair: column name = dt, partition value = 20150301
+ * The second pair: column name = phone, partition value = 1300
+ * The thris pair: column name = gender, partition value = m
+ *
+ */
+public class PartitionKey implements ProtoObject<CatalogProtos.PartitionKeyProto>, Cloneable, GsonObject {
+ @Expose protected String columnName; // required
+ @Expose protected String partitionValue; // required
+
+ private CatalogProtos.PartitionKeyProto.Builder builder = CatalogProtos.PartitionKeyProto.newBuilder();
+
+ public PartitionKey() {
+ }
+
+ public PartitionKey(String columnName, String partitionValue) {
+ this.columnName = columnName;
+ this.partitionValue = partitionValue;
+ }
+
+ public PartitionKey(PartitionKey partition) {
+ this.columnName = partition.columnName;
+ this.partitionValue = partition.partitionValue;
+ }
+
+ public PartitionKey(CatalogProtos.PartitionKeyProto proto) {
+ if (proto.hasColumnName()) {
+ this.columnName = proto.getColumnName();
+ }
+ if (proto.hasPartitionValue()) {
+ this.partitionValue = proto.getPartitionValue();
+ }
+ }
+
+ public String getPartitionValue() {
+ return partitionValue;
+ }
+
+ public void setPartitionValue(String partitionValue) {
+ this.partitionValue = partitionValue;
+ }
+
+ public String getColumnName() {
+ return columnName;
+ }
+
+ public void setColumnName(String columnName) {
+ this.columnName = columnName;
+ }
+
+ public int hashCode() {
+ return Objects.hashCode(partitionValue, columnName);
+ }
+
+ public boolean equals(Object o) {
+ if (o instanceof PartitionKey) {
+ PartitionKey another = (PartitionKey) o;
+ return TUtil.checkEquals(columnName, another.columnName) &&
+ TUtil.checkEquals(partitionValue, another.partitionValue);
+ }
+ return false;
+ }
+
+ @Override
+ public CatalogProtos.PartitionKeyProto getProto() {
+ if (builder == null) {
+ builder = CatalogProtos.PartitionKeyProto.newBuilder();
+ }
+
+ if (this.columnName != null) {
+ builder.setColumnName(this.columnName);
+ }
+
+ if (this.partitionValue != null) {
+ builder.setPartitionValue(this.partitionValue);
+ }
+
+ return builder.build();
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder("name: " + partitionValue);
+ return sb.toString();
+ }
+
+ @Override
+ public String toJson() {
+ return CatalogGsonHelper.toJson(this, PartitionKey.class);
+ }
+
+ public static PartitionKey fromJson(String strVal) {
+ return strVal != null ? CatalogGsonHelper.fromJson(strVal, PartitionKey.class) : null;
+ }
+
+ public Object clone() throws CloneNotSupportedException {
+ PartitionKey desc = (PartitionKey) super.clone();
+ desc.builder = CatalogProtos.PartitionKeyProto.newBuilder();
+ desc.partitionValue = partitionValue;
+ desc.columnName = columnName;
+
+ return desc;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index a204685..3abd840 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -58,6 +58,8 @@ enum AlterTableType {
RENAME_TABLE = 0;
RENAME_COLUMN = 1;
ADD_COLUMN = 2;
+ ADD_PARTITION = 3;
+ DROP_PARTITION = 4;
}
message ColumnProto {
@@ -184,11 +186,10 @@ message TableOptionProto {
}
message TablePartitionProto {
- required int32 pid = 1;
+ required int32 partition_id = 1;
required int32 tid = 2;
optional string partitionName = 3;
- required int32 ordinalPosition = 4;
- optional string path = 5;
+ optional string path = 4;
}
message GetIndexByColumnRequest {
@@ -281,8 +282,7 @@ message SortSpecProto {
message PartitionsProto {
- required TableIdentifierProto tableIdentifier = 1;
- repeated PartitionDescProto partition = 2;
+ repeated PartitionDescProto partition = 1;
}
message PartitionMethodProto {
@@ -293,10 +293,21 @@ message PartitionMethodProto {
}
message PartitionDescProto {
- optional string partitionName = 2;
- required int32 ordinalPosition = 3;
- optional string partitionValue = 4;
- optional string path = 5;
+ required string partitionName = 1;
+ repeated PartitionKeyProto partitionKeys = 2;
+ optional string path = 3;
+}
+
+message PartitionKeyProto {
+ required string columnName = 1;
+ required string partitionValue = 2;
+}
+
+
+message PartitionIdentifierProto {
+ required string databaseName = 1;
+ required string tableName = 2;
+ optional string partitionName = 3;
}
message TablespaceProto {
@@ -345,6 +356,7 @@ message AlterTableDescProto {
optional ColumnProto addColumn = 3;
optional AlterColumnProto alterColumnName = 4;
required AlterTableType alterTableType = 5;
+ optional PartitionDescProto partitionDesc = 6;
}
message AlterColumnProto {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
index 2c3fc6a..2761517 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java
@@ -608,7 +608,8 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
final String databaseName = split[0];
final String tableName = split[1];
-
+ String partitionName = null;
+ CatalogProtos.PartitionDescProto partitionDesc = null;
switch (alterTableDescProto.getAlterTableType()) {
case RENAME_TABLE:
@@ -629,6 +630,22 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
}
addNewColumn(databaseName, tableName, alterTableDescProto.getAddColumn());
break;
+ case ADD_PARTITION:
+ partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
+ partitionDesc = getPartition(databaseName, tableName, partitionName);
+ if(partitionDesc != null) {
+ throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName);
+ }
+ addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc());
+ break;
+ case DROP_PARTITION:
+ partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
+ partitionDesc = getPartition(databaseName, tableName, partitionName);
+ if(partitionDesc == null) {
+ throw new NoSuchPartitionException(databaseName, tableName, partitionName);
+ }
+ dropPartition(databaseName, tableName, partitionDesc);
+ break;
default:
//TODO
}
@@ -701,6 +718,59 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
}
}
+ private void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto
+ partitionDescProto) {
+ HCatalogStoreClientPool.HCatalogStoreClient client = null;
+ try {
+
+ client = clientPool.getClient();
+
+ Partition partition = new Partition();
+ partition.setDbName(databaseName);
+ partition.setTableName(tableName);
+
+ List<String> values = Lists.newArrayList();
+ for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
+ values.add(keyProto.getPartitionValue());
+ }
+ partition.setValues(values);
+
+ Table table = client.getHiveClient().getTable(databaseName, tableName);
+ StorageDescriptor sd = table.getSd();
+ sd.setLocation(partitionDescProto.getPath());
+ partition.setSd(sd);
+
+ client.getHiveClient().add_partition(partition);
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
+ private void dropPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto
+ partitionDescProto) {
+ HCatalogStoreClientPool.HCatalogStoreClient client = null;
+ try {
+
+ client = clientPool.getClient();
+
+ List<String> values = Lists.newArrayList();
+ for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) {
+ values.add(keyProto.getPartitionValue());
+ }
+ client.getHiveClient().dropPartition(databaseName, tableName, values, true);
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ }
+
@Override
public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException {
// TODO - not implemented yet
@@ -723,35 +793,48 @@ public class HCatalogStore extends CatalogConstants implements CatalogStore {
}
@Override
- public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException {
- // TODO - not implemented yet
+ public List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName,
+ String tableName) throws CatalogException {
+ throw new UnsupportedOperationException();
}
- @Override
- public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException {
-
- }
@Override
- public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException {
- return null; // TODO - not implemented yet
- }
+ public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName,
+ String partitionName) throws CatalogException {
+ HCatalogStoreClientPool.HCatalogStoreClient client = null;
+ CatalogProtos.PartitionDescProto.Builder builder = null;
- @Override
- public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException {
- return null; // TODO - not implemented yet
- }
+ try {
+ client = clientPool.getClient();
- @Override
- public void delPartition(String partitionName) throws CatalogException {
- // TODO - not implemented yet
- }
+ Partition partition = client.getHiveClient().getPartition(databaseName, tableName, partitionName);
+ builder = CatalogProtos.PartitionDescProto.newBuilder();
+ builder.setPartitionName(partitionName);
+ builder.setPath(partition.getSd().getLocation());
- @Override
- public void dropPartitions(String tableName) throws CatalogException {
+ String[] partitionNames = partitionName.split("/");
- }
+ for (int i = 0; i < partition.getValues().size(); i++) {
+ String value = partition.getValues().get(i);
+ CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder();
+ String columnName = partitionNames[i].split("=")[0];
+ keyBuilder.setColumnName(columnName);
+ keyBuilder.setPartitionValue(value);
+ builder.addPartitionKeys(keyBuilder);
+ }
+ } catch (NoSuchObjectException e) {
+ return null;
+ } catch (Exception e) {
+ throw new CatalogException(e);
+ } finally {
+ if (client != null) {
+ client.release();
+ }
+ }
+ return builder.build();
+ }
@Override
public final void addFunction(final FunctionDesc func) throws CatalogException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
index 725f665..32ab674 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java
@@ -24,10 +24,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionKey;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
@@ -40,8 +39,11 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
import java.util.List;
+import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
import static org.junit.Assert.*;
/**
@@ -232,11 +234,12 @@ public class TestHCatalogStore {
org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema();
expressionSchema.addColumn("n_nationkey", TajoDataTypes.Type.INT4);
+ expressionSchema.addColumn("n_date", TajoDataTypes.Type.TEXT);
PartitionMethodDesc partitions = new PartitionMethodDesc(
DB_NAME,
NATION,
- CatalogProtos.PartitionType.COLUMN, expressionSchema.getColumn(0).getQualifiedName(), expressionSchema);
+ CatalogProtos.PartitionType.COLUMN, "n_nationkey,n_date", expressionSchema);
table.setPartitionMethod(partitions);
store.createTable(table.getProto());
@@ -250,18 +253,80 @@ public class TestHCatalogStore {
assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName());
}
-
Schema partitionSchema = table.getPartitionMethod().getExpressionSchema();
Schema partitionSchema1 = table1.getPartitionMethod().getExpressionSchema();
assertEquals(partitionSchema.size(), partitionSchema1.size());
+
for (int i = 0; i < partitionSchema.size(); i++) {
assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName());
}
+ testAddPartition(table1.getPath(), NATION, "n_nationkey=10/n_date=20150101");
+ testAddPartition(table1.getPath(), NATION, "n_nationkey=20/n_date=20150102");
+
+ testDropPartition(NATION, "n_nationkey=10/n_date=20150101");
+ testDropPartition(NATION, "n_nationkey=20/n_date=20150102");
+
+ CatalogProtos.PartitionDescProto partition = store.getPartition(DB_NAME, NATION, "n_nationkey=10/n_date=20150101");
+ assertNull(partition);
+
+ partition = store.getPartition(DB_NAME, NATION, "n_nationkey=20/n_date=20150102");
+ assertNull(partition);
+
store.dropTable(DB_NAME, NATION);
}
+ private void testAddPartition(URI uri, String tableName, String partitionName) throws Exception {
+ AlterTableDesc alterTableDesc = new AlterTableDesc();
+ alterTableDesc.setTableName(DB_NAME + "." + tableName);
+ alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION);
+
+ Path path = new Path(uri.getPath(), partitionName);
+
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.setPartitionName(partitionName);
+
+ List<PartitionKey> partitionKeyList = new ArrayList<PartitionKey>();
+ String[] partitionNames = partitionName.split("/");
+ for(int i = 0; i < partitionNames.length; i++) {
+ String[] eachPartitionName = partitionNames[i].split("=");
+ partitionKeyList.add(new PartitionKey(eachPartitionName[0], eachPartitionName[1]));
+ }
+ partitionDesc.setPartitionKeys(partitionKeyList);
+ partitionDesc.setPath(path.toString());
+
+ alterTableDesc.setPartitionDesc(partitionDesc);
+
+ store.alterTable(alterTableDesc.getProto());
+
+ CatalogProtos.PartitionDescProto resultDesc = store.getPartition(DB_NAME, NATION, partitionName);
+ assertNotNull(resultDesc);
+ assertEquals(resultDesc.getPartitionName(), partitionName);
+ assertEquals(resultDesc.getPath(), uri.toString() + "/" + partitionName);
+ assertEquals(resultDesc.getPartitionKeysCount(), 2);
+
+ for (int i = 0; i < resultDesc.getPartitionKeysCount(); i++) {
+ CatalogProtos.PartitionKeyProto keyProto = resultDesc.getPartitionKeys(i);
+ String[] eachName = partitionNames[i].split("=");
+ assertEquals(keyProto.getPartitionValue(), eachName[1]);
+ }
+ }
+
+
+ private void testDropPartition(String tableName, String partitionName) throws Exception {
+ AlterTableDesc alterTableDesc = new AlterTableDesc();
+ alterTableDesc.setTableName(DB_NAME + "." + tableName);
+ alterTableDesc.setAlterTableType(AlterTableType.DROP_PARTITION);
+
+ PartitionDesc partitionDesc = new PartitionDesc();
+ partitionDesc.setPartitionName(partitionName);
+
+ alterTableDesc.setPartitionDesc(partitionDesc);
+
+ store.alterTable(alterTableDesc.getProto());
+ }
+
@Test
public void testGetAllTableNames() throws Exception{
TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, new KeyValueSet());
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index c34b4d2..e9fb177 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -33,6 +33,7 @@ import org.apache.tajo.annotation.ThreadSafe;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.store.CatalogStore;
import org.apache.tajo.catalog.store.DerbyStore;
@@ -69,7 +70,6 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
*/
@ThreadSafe
public class CatalogServer extends AbstractService {
-
private final static String DEFAULT_NAMESPACE = "public";
private final static Log LOG = LogFactory.getLog(CatalogServer.class);
@@ -821,34 +821,90 @@ public class CatalogServer extends AbstractService {
}
@Override
- public BoolProto addPartitions(RpcController controller, PartitionsProto request) throws ServiceException {
- return ProtoUtil.TRUE;
- }
+ public PartitionDescProto getPartitionByPartitionName(RpcController controller, PartitionIdentifierProto request)
+ throws ServiceException {
+ String databaseName = request.getDatabaseName();
+ String tableName = request.getTableName();
+ String partitionName = request.getPartitionName();
- @Override
- public BoolProto addPartition(RpcController controller, PartitionDescProto request) throws ServiceException {
- return ProtoUtil.TRUE;
- }
+ if (metaDictionary.isSystemDatabase(databaseName)) {
+ throw new ServiceException(databaseName + " is a system databsae. It does not contain any partitioned tables.");
+ }
- @Override
- public PartitionDescProto getPartitionByPartitionName(RpcController controller, StringProto request)
- throws ServiceException {
- return null;
- }
+ rlock.lock();
+ try {
+ boolean contain;
- @Override
- public PartitionsProto getPartitionsByTableName(RpcController controller,
- StringProto request)
- throws ServiceException {
- return null;
+ contain = store.existDatabase(databaseName);
+ if (contain) {
+ contain = store.existTable(databaseName, tableName);
+ if (contain) {
+ if (store.existPartitionMethod(databaseName, tableName)) {
+ PartitionDescProto partitionDesc = store.getPartition(databaseName, tableName, partitionName);
+ if (partitionDesc != null) {
+ return partitionDesc;
+ } else {
+ throw new NoSuchPartitionException(databaseName, tableName, partitionName);
+ }
+ } else {
+ throw new NoPartitionedTableException(databaseName, tableName);
+ }
+ } else {
+ throw new NoSuchTableException(tableName);
+ }
+ } else {
+ throw new NoSuchDatabaseException(databaseName);
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new ServiceException(e);
+ } finally {
+ rlock.unlock();
+ }
}
@Override
- public PartitionsProto delAllPartitions(RpcController controller, StringProto request)
- throws ServiceException {
- return null;
+ public PartitionsProto getPartitionsByTableName(RpcController controller, PartitionIdentifierProto request)
+ throws ServiceException {
+ String databaseName = request.getDatabaseName();
+ String tableName = request.getTableName();
+
+ if (metaDictionary.isSystemDatabase(databaseName)) {
+ throw new ServiceException(databaseName + " is a system databsae. It does not contain any partitioned tables.");
+ }
+
+ rlock.lock();
+ try {
+ boolean contain;
+
+ contain = store.existDatabase(databaseName);
+ if (contain) {
+ contain = store.existTable(databaseName, tableName);
+ if (contain) {
+ if (store.existPartitionMethod(databaseName, tableName)) {
+ List<PartitionDescProto> partitions = store.getPartitions(databaseName, tableName);
+ PartitionsProto.Builder builder = PartitionsProto.newBuilder();
+ for(PartitionDescProto partition : partitions) {
+ builder.addPartition(partition);
+ }
+ return builder.build();
+ } else {
+ throw new NoPartitionedTableException(databaseName, tableName);
+ }
+ } else {
+ throw new NoSuchTableException(tableName);
+ }
+ } else {
+ throw new NoSuchDatabaseException(databaseName);
+ }
+ } catch (Exception e) {
+ LOG.error(e);
+ throw new ServiceException(e);
+ } finally {
+ rlock.unlock();
+ }
}
-
+
@Override
public GetTablePartitionsProto getAllPartitions(RpcController controller, NullProto request) throws ServiceException {
rlock.lock();
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
index 0ac0a54..1bb8bc5 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java
@@ -40,6 +40,7 @@ public class InfoSchemaMetadataDictionary {
TABLEOPTIONS,
TABLESTATS,
PARTITIONS,
+ PARTITION_KEYS,
CLUSTER,
MAX_TABLE;
}
@@ -60,6 +61,7 @@ public class InfoSchemaMetadataDictionary {
schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLEOPTIONS.ordinal(), new TableOptionsTableDescriptor(this));
schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLESTATS.ordinal(), new TableStatsTableDescriptor(this));
schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITIONS.ordinal(), new PartitionsTableDescriptor(this));
+ schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITION_KEYS.ordinal(), new PartitionKeysTableDescriptor(this));
schemaInfoTableDescriptors.set(DEFINED_TABLES.CLUSTER.ordinal(), new ClusterTableDescriptor(this));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java
new file mode 100644
index 0000000..ea35cef
--- /dev/null
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.catalog.dictionary;
+
+import org.apache.tajo.common.TajoDataTypes.Type;
+
+class PartitionKeysTableDescriptor extends AbstractTableDescriptor {
+
+ private static final String TABLENAME = "partition_keys";
+ private final ColumnDescriptor[] columns = new ColumnDescriptor[] {
+ new ColumnDescriptor("partition_id", Type.INT4, 0),
+ new ColumnDescriptor("column_name", Type.TEXT, 0),
+ new ColumnDescriptor("partition_value", Type.TEXT, 0),
+ };
+
+ public PartitionKeysTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) {
+ super(metadataDictionary);
+ }
+
+ @Override
+ public String getTableNameString() {
+ return TABLENAME;
+ }
+
+ @Override
+ protected ColumnDescriptor[] getColumnDescriptors() {
+ return columns;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java
index d69c93e..a6725c0 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java
@@ -24,10 +24,9 @@ class PartitionsTableDescriptor extends AbstractTableDescriptor {
private static final String TABLENAME = "partitions";
private final ColumnDescriptor[] columns = new ColumnDescriptor[] {
- new ColumnDescriptor("pid", Type.INT4, 0),
+ new ColumnDescriptor("partition_id", Type.INT4, 0),
new ColumnDescriptor("tid", Type.INT4, 0),
new ColumnDescriptor("partition_name", Type.TEXT, 0),
- new ColumnDescriptor("ordinal_position", Type.INT4, 0),
new ColumnDescriptor("path", Type.TEXT, 0)
};
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
index be6bf1c..518b499 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
@@ -973,7 +973,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
String databaseName = splitted[0];
String tableName = splitted[1];
-
+ String partitionName = null;
+ CatalogProtos.PartitionDescProto partitionDesc = null;
try {
int databaseId = getDatabaseId(databaseName);
@@ -998,6 +999,22 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
addNewColumn(tableId, alterTableDescProto.getAddColumn());
break;
+ case ADD_PARTITION:
+ partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
+ partitionDesc = getPartition(databaseName, tableName, partitionName);
+ if(partitionDesc != null) {
+ throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName);
+ }
+ addPartition(tableId, alterTableDescProto.getPartitionDesc());
+ break;
+ case DROP_PARTITION:
+ partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
+ partitionDesc = getPartition(databaseName, tableName, partitionName);
+ if(partitionDesc == null) {
+ throw new NoSuchPartitionException(databaseName, tableName, partitionName);
+ }
+ dropPartition(tableId, alterTableDescProto.getPartitionDesc().getPartitionName());
+ break;
default:
}
} catch (SQLException sqlException) {
@@ -1158,6 +1175,120 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
}
+ public void addPartition(int tableId, CatalogProtos.PartitionDescProto partition) throws CatalogException {
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ final String ADD_PARTITION_SQL =
+ "INSERT INTO " + TB_PARTTIONS
+ + " (" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?,?,?)";
+
+ final String ADD_PARTITION_KEYS_SQL =
+ "INSERT INTO " + TB_PARTTION_KEYS + " (" + COL_PARTITIONS_PK + ", " + COL_COLUMN_NAME + ", "
+ + COL_PARTITION_VALUE + ") VALUES (?,?,?)";
+
+ try {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(ADD_PARTITION_SQL);
+ }
+
+ conn = getConnection();
+ pstmt = conn.prepareStatement(ADD_PARTITION_SQL);
+
+ pstmt.setInt(1, tableId);
+ pstmt.setString(2, partition.getPartitionName());
+ pstmt.setString(3, partition.getPath());
+ pstmt.executeUpdate();
+
+ if (partition.getPartitionKeysCount() > 0) {
+ pstmt = conn.prepareStatement(ADD_PARTITION_KEYS_SQL);
+ int partitionId = getPartitionId(tableId, partition.getPartitionName());
+ addPartitionKeys(pstmt, partitionId, partition);
+ pstmt.executeBatch();
+ }
+ } catch (SQLException se) {
+ throw new CatalogException(se);
+ } finally {
+ CatalogUtil.closeQuietly(pstmt);
+ }
+ }
+
+ public int getPartitionId(int tableId, String partitionName) throws CatalogException {
+ Connection conn = null;
+ ResultSet res = null;
+ PreparedStatement pstmt = null;
+ int retValue = -1;
+
+ try {
+ String sql = "SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS +
+ " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? ";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+ pstmt.setInt(1, tableId);
+ pstmt.setString(2, partitionName);
+ res = pstmt.executeQuery();
+
+ if (res.next()) {
+ retValue = res.getInt(1);
+ }
+ } catch (SQLException se) {
+ throw new CatalogException(se);
+ } finally {
+ CatalogUtil.closeQuietly(pstmt, res);
+ }
+ return retValue;
+ }
+
+ private void addPartitionKeys(PreparedStatement pstmt, int partitionId, PartitionDescProto partition) throws
+ SQLException {
+ for (int i = 0; i < partition.getPartitionKeysCount(); i++) {
+ PartitionKeyProto partitionKey = partition.getPartitionKeys(i);
+
+ pstmt.setInt(1, partitionId);
+ pstmt.setString(2, partitionKey.getColumnName());
+ pstmt.setString(3, partitionKey.getPartitionValue());
+
+ pstmt.addBatch();
+ pstmt.clearParameters();
+ }
+ }
+
+
+ private void dropPartition(int tableId, String partitionName) throws CatalogException {
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+
+ try {
+ int partitionId = getPartitionId(tableId, partitionName);
+
+ String sqlDeletePartitionKeys = "DELETE FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? ";
+ String sqlDeletePartition = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_PARTITIONS_PK + " = ? ";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sqlDeletePartitionKeys);
+ }
+
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sqlDeletePartitionKeys);
+ pstmt.setInt(1, partitionId);
+ pstmt.executeUpdate();
+
+ pstmt = conn.prepareStatement(sqlDeletePartition);
+ pstmt.setInt(1, partitionId);
+ pstmt.executeUpdate();
+
+ } catch (SQLException se) {
+ throw new CatalogException(se);
+ } finally {
+ CatalogUtil.closeQuietly(pstmt);
+ }
+ }
+
private int getDatabaseId(String databaseName) throws SQLException {
String sql = String.format("SELECT DB_ID from %s WHERE DB_NAME = ?", TB_DATABASES);
@@ -1260,6 +1391,19 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
pstmt.executeUpdate();
pstmt.close();
+ sql = "DELETE FROM " + TB_PARTTION_KEYS
+ + " WHERE " + COL_PARTITIONS_PK
+ + " IN (SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_PK + "= ? )";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ pstmt = conn.prepareStatement(sql);
+ pstmt.setInt(1, tableId);
+ pstmt.executeUpdate();
+ pstmt.close();
+
sql = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_PK + " = ? ";
if (LOG.isDebugEnabled()) {
@@ -1698,66 +1842,14 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
return columns;
}
- private static final String ADD_PARTITION_SQL =
- "INSERT INTO " + TB_PARTTIONS + " (TID, PARTITION_NAME, ORDINAL_POSITION, PATH) VALUES (?,?,?,?)";
-
-
- @Override
- public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException {
- Connection conn = null;
- PreparedStatement pstmt = null;
-
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug(ADD_PARTITION_SQL);
- }
-
- String databaseName = partitionsProto.getTableIdentifier().getDatabaseName();
- String tableName = partitionsProto.getTableIdentifier().getTableName();
-
- int databaseId = getDatabaseId(databaseName);
- int tableId = getTableId(databaseId, databaseName, tableName);
-
- conn = getConnection();
- pstmt = conn.prepareStatement(ADD_PARTITION_SQL);
-
- for (CatalogProtos.PartitionDescProto partition : partitionsProto.getPartitionList()) {
- addPartitionInternal(pstmt, tableId, partition);
- }
- pstmt.executeBatch();
- conn.commit();
- } catch (SQLException se) {
- if (conn != null) {
- try {
- conn.rollback();
- } catch (SQLException e) {
- LOG.error(e, e);
- }
- }
- throw new CatalogException(se);
- } finally {
- CatalogUtil.closeQuietly(pstmt);
- }
- }
-
- private static void addPartitionInternal(PreparedStatement pstmt, int tableId, PartitionDescProto partition) throws
- SQLException {
- pstmt.setInt(1, tableId);
- pstmt.setString(2, partition.getPartitionName());
- pstmt.setInt(3, partition.getOrdinalPosition());
- pstmt.setString(4, partition.getPath());
- pstmt.addBatch();
- pstmt.clearParameters();
- }
-
@Override
public void addPartitionMethod(CatalogProtos.PartitionMethodProto proto) throws CatalogException {
Connection conn = null;
PreparedStatement pstmt = null;
try {
- String sql = "INSERT INTO " + TB_PARTITION_METHODS + " (TID, PARTITION_TYPE, EXPRESSION, EXPRESSION_SCHEMA) " +
- "VALUES (?,?,?,?)";
+ String sql = "INSERT INTO " + TB_PARTITION_METHODS
+ + " (" + COL_TABLES_PK + ", PARTITION_TYPE, EXPRESSION, EXPRESSION_SCHEMA) VALUES (?,?,?,?)";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
@@ -1789,15 +1881,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
PreparedStatement pstmt = null;
try {
- String sql = "DELETE FROM " + TB_PARTITION_METHODS + " WHERE " + COL_TABLES_NAME + " = ? ";
+ String sql = "DELETE FROM " + TB_PARTITION_METHODS + " WHERE " + COL_TABLES_PK + " = ? ";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
+ int databaseId = getDatabaseId(databaseName);
+ int tableId = getTableId(databaseId, databaseName, tableName);
+
conn = getConnection();
pstmt = conn.prepareStatement(sql);
- pstmt.setString(1, tableName);
+ pstmt.setInt(1, tableId);
pstmt.executeUpdate();
} catch (SQLException se) {
throw new CatalogException(se);
@@ -1815,15 +1910,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
try {
String sql = "SELECT partition_type, expression, expression_schema FROM " + TB_PARTITION_METHODS +
- " WHERE " + COL_TABLES_NAME + " = ? ";
+ " WHERE " + COL_TABLES_PK + " = ? ";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
+ int databaseId = getDatabaseId(databaseName);
+ int tableId = getTableId(databaseId, databaseName, tableName);
+
conn = getConnection();
pstmt = conn.prepareStatement(sql);
- pstmt.setString(1, tableName);
+ pstmt.setInt(1, tableId);
res = pstmt.executeQuery();
if (res.next()) {
@@ -1848,15 +1946,18 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
try {
String sql = "SELECT partition_type, expression, expression_schema FROM " + TB_PARTITION_METHODS +
- " WHERE " + COL_TABLES_NAME + "= ?";
+ " WHERE " + COL_TABLES_PK + "= ?";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
+ int databaseId = getDatabaseId(databaseName);
+ int tableId = getTableId(databaseId, databaseName, tableName);
+
conn = getConnection();
pstmt = conn.prepareStatement(sql);
- pstmt.setString(1, tableName);
+ pstmt.setInt(1, tableId);
res = pstmt.executeQuery();
exist = res.next();
@@ -1869,90 +1970,113 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
}
@Override
- public void addPartition(String databaseName, String tableName,
- CatalogProtos.PartitionDescProto partition) throws CatalogException {
+ public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName,
+ String partitionName) throws CatalogException {
Connection conn = null;
+ ResultSet res = null;
PreparedStatement pstmt = null;
+ PartitionDescProto.Builder builder = null;
try {
+ String sql = "SELECT PATH, " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS +
+ " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? ";
+
if (LOG.isDebugEnabled()) {
- LOG.debug(ADD_PARTITION_SQL);
+ LOG.debug(sql);
}
int databaseId = getDatabaseId(databaseName);
int tableId = getTableId(databaseId, databaseName, tableName);
conn = getConnection();
- pstmt = conn.prepareStatement(ADD_PARTITION_SQL);
- addPartitionInternal(pstmt, tableId, partition);
- pstmt.executeUpdate();
+ pstmt = conn.prepareStatement(sql);
+ pstmt.setInt(1, tableId);
+ pstmt.setString(2, partitionName);
+ res = pstmt.executeQuery();
+
+ if (res.next()) {
+ builder = PartitionDescProto.newBuilder();
+ builder.setPath(res.getString("PATH"));
+ builder.setPartitionName(partitionName);
+ setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder);
+ } else {
+ return null;
+ }
} catch (SQLException se) {
throw new CatalogException(se);
} finally {
- CatalogUtil.closeQuietly(pstmt);
+ CatalogUtil.closeQuietly(pstmt, res);
}
+ return builder.build();
}
- @Override
- public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException {
- // TODO
- throw new UnimplementedException("getPartition is not implemented");
- }
-
-
- @Override
- public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException {
- // TODO
- throw new UnimplementedException("getPartitions is not implemented");
- }
-
-
- @Override
- public void delPartition(String partitionName) throws CatalogException {
+ private void setPartitionKeys(int pid, PartitionDescProto.Builder partitionDesc) throws
+ CatalogException {
Connection conn = null;
+ ResultSet res = null;
PreparedStatement pstmt = null;
try {
- String sql = "DELETE FROM " + TB_PARTTIONS + " WHERE PARTITION_NAME = ? ";
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(sql);
- }
+ String sql = "SELECT "+ COL_COLUMN_NAME + " , "+ COL_PARTITION_VALUE
+ + " FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? ";
conn = getConnection();
pstmt = conn.prepareStatement(sql);
- pstmt.setString(1, partitionName);
- pstmt.executeUpdate();
+ pstmt.setInt(1, pid);
+ res = pstmt.executeQuery();
+
+ while (res.next()) {
+ PartitionKeyProto.Builder builder = PartitionKeyProto.newBuilder();
+ builder.setColumnName(res.getString(COL_COLUMN_NAME));
+ builder.setPartitionValue(res.getString(COL_PARTITION_VALUE));
+ partitionDesc.addPartitionKeys(builder);
+ }
} catch (SQLException se) {
throw new CatalogException(se);
} finally {
- CatalogUtil.closeQuietly(pstmt);
+ CatalogUtil.closeQuietly(pstmt, res);
}
}
@Override
- public void dropPartitions(String tableName) throws CatalogException {
+ public List<PartitionDescProto> getPartitions(String databaseName, String tableName) throws CatalogException {
Connection conn = null;
+ ResultSet res = null;
PreparedStatement pstmt = null;
+ PartitionDescProto.Builder builder = null;
+ List<PartitionDescProto> partitions = new ArrayList<PartitionDescProto>();
try {
- String sql = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_NAME + "= ? ";
+ String sql = "SELECT PATH, PARTITION_NAME, " + COL_PARTITIONS_PK + " FROM "
+ + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? ";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
}
+ int databaseId = getDatabaseId(databaseName);
+ int tableId = getTableId(databaseId, databaseName, tableName);
+
conn = getConnection();
pstmt = conn.prepareStatement(sql);
- pstmt.setString(1, tableName);
- pstmt.executeUpdate();
+ pstmt.setInt(1, tableId);
+ res = pstmt.executeQuery();
+
+ while (res.next()) {
+ builder = PartitionDescProto.newBuilder();
+ builder.setPath(res.getString("PATH"));
+ builder.setPartitionName(res.getString("PARTITION_NAME"));
+ setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder);
+ partitions.add(builder.build());
+ }
} catch (SQLException se) {
throw new CatalogException(se);
} finally {
- CatalogUtil.closeQuietly(pstmt);
+ CatalogUtil.closeQuietly(pstmt, res);
}
+ return partitions;
}
-
+
@Override
public List<TablePartitionProto> getAllPartitions() throws CatalogException {
Connection conn = null;
@@ -1962,20 +2086,20 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
List<TablePartitionProto> partitions = new ArrayList<TablePartitionProto>();
try {
- String sql = "SELECT PID, TID, PARTITION_NAME, ORDINAL_POSITION, PATH FROM " + TB_PARTTIONS;
+ String sql = "SELECT " + COL_PARTITIONS_PK + ", " + COL_TABLES_PK + ", PARTITION_NAME, " +
+ " PATH FROM " + TB_PARTTIONS;
conn = getConnection();
stmt = conn.createStatement();
resultSet = stmt.executeQuery(sql);
while (resultSet.next()) {
TablePartitionProto.Builder builder = TablePartitionProto.newBuilder();
-
- builder.setPid(resultSet.getInt("PID"));
- builder.setTid(resultSet.getInt("TID"));
+
+ builder.setPartitionId(resultSet.getInt(COL_PARTITIONS_PK));
+ builder.setTid(resultSet.getInt(COL_TABLES_PK));
builder.setPartitionName(resultSet.getString("PARTITION_NAME"));
- builder.setOrdinalPosition(resultSet.getInt("ORDINAL_POSITION"));
builder.setPath(resultSet.getString("PATH"));
-
+
partitions.add(builder.build());
}
} catch (SQLException se) {
@@ -1983,7 +2107,7 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
} finally {
CatalogUtil.closeQuietly(stmt, resultSet);
}
-
+
return partitions;
}
@@ -2003,7 +2127,8 @@ public abstract class AbstractDBStore extends CatalogConstants implements Catalo
String sql = "INSERT INTO " + TB_INDEXES +
" (" + COL_DATABASES_PK + ", " + COL_TABLES_PK + ", INDEX_NAME, " +
- "COLUMN_NAME, DATA_TYPE, INDEX_TYPE, IS_UNIQUE, IS_CLUSTERED, IS_ASCENDING) VALUES (?,?,?,?,?,?,?,?,?)";
+ "" + COL_COLUMN_NAME + ", DATA_TYPE, INDEX_TYPE, IS_UNIQUE, IS_CLUSTERED, IS_ASCENDING) " +
+ "VALUES (?,?,?,?,?,?,?,?,?)";
if (LOG.isDebugEnabled()) {
LOG.debug(sql);
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java
index 6d0876f..be9727e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java
@@ -204,6 +204,19 @@ public abstract class AbstractMySQLMariaDBStore extends AbstractDBStore {
baseTableMaps.put(TB_PARTTIONS, true);
}
+ // PARTITION_KEYS
+ if (!baseTableMaps.get(TB_PARTTION_KEYS)) {
+ String sql = readSchemaFile("partition_params.sql");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql.toString());
+ }
+
+ stmt.executeUpdate(sql.toString());
+ LOG.info("Table '" + TB_PARTTION_KEYS + "' is created.");
+ baseTableMaps.put(TB_PARTTION_KEYS, true);
+ }
+
insertSchemaVersion();
} catch (SQLException se) {
@@ -270,6 +283,7 @@ public abstract class AbstractMySQLMariaDBStore extends AbstractDBStore {
baseTableMaps.put(TB_INDEXES, false);
baseTableMaps.put(TB_PARTITION_METHODS, false);
baseTableMaps.put(TB_PARTTIONS, false);
+ baseTableMaps.put(TB_PARTTION_KEYS, false);
if (res.wasNull())
return false;
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
index ed6fedc..57ee74f 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java
@@ -102,25 +102,17 @@ public interface CatalogStore extends Closeable {
/************************** PARTITIONS *****************************/
- void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException;
-
- void addPartition(String databaseName, String tableName,
- CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException;
-
/**
* Get all partitions of a table
* @param tableName the table name
* @return
* @throws CatalogException
*/
- CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException;
+ List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, String tableName) throws CatalogException;
- CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException;
+ CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName,
+ String partitionName) throws CatalogException;
- void delPartition(String partitionName) throws CatalogException;
-
- void dropPartitions(String tableName) throws CatalogException;
-
List<TablePartitionProto> getAllPartitions() throws CatalogException;
/**************************** INDEX *******************************/
http://git-wip-us.apache.org/repos/asf/tajo/blob/cad54428/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
index e37efe6..470f09d 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java
@@ -29,6 +29,7 @@ import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.FunctionDesc;
import org.apache.tajo.catalog.exception.*;
+import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto;
import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto;
@@ -58,6 +59,7 @@ public class MemStore implements CatalogStore {
private final Map<String, CatalogProtos.FunctionDescProto> functions = Maps.newHashMap();
private final Map<String, Map<String, IndexDescProto>> indexes = Maps.newHashMap();
private final Map<String, Map<String, IndexDescProto>> indexesByColumn = Maps.newHashMap();
+ private final Map<String, Map<String, CatalogProtos.PartitionDescProto>> partitions = Maps.newHashMap();
public MemStore(Configuration conf) {
}
@@ -67,6 +69,7 @@ public class MemStore implements CatalogStore {
databases.clear();
functions.clear();
indexes.clear();
+ partitions.clear();
}
@Override
@@ -270,6 +273,8 @@ public class MemStore implements CatalogStore {
final CatalogProtos.TableDescProto tableDescProto = database.get(tableName);
CatalogProtos.TableDescProto newTableDescProto;
CatalogProtos.SchemaProto schemaProto;
+ String partitionName = null;
+ CatalogProtos.PartitionDescProto partitionDesc = null;
switch (alterTableDescProto.getAlterTableType()) {
case RENAME_TABLE:
@@ -304,11 +309,52 @@ public class MemStore implements CatalogStore {
newTableDescProto = tableDescProto.toBuilder().setSchema(newSchemaProto).build();
database.put(tableName, newTableDescProto);
break;
+ case ADD_PARTITION:
+ partitionDesc = alterTableDescProto.getPartitionDesc();
+ partitionName = partitionDesc.getPartitionName();
+
+ if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) {
+ throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName);
+ } else {
+ CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+ builder.setPartitionName(partitionName);
+ builder.setPath(partitionDesc.getPath());
+
+ if (partitionDesc.getPartitionKeysCount() > 0) {
+ int i = 0;
+ for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) {
+ CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder();
+ keyBuilder.setColumnName(eachKey.getColumnName());
+ keyBuilder.setPartitionValue(eachKey.getPartitionValue());
+ builder.setPartitionKeys(i, keyBuilder.build());
+ i++;
+ }
+ }
+
+ Map<String, CatalogProtos.PartitionDescProto> protoMap = null;
+ if (!partitions.containsKey(tableName)) {
+ protoMap = Maps.newHashMap();
+ } else {
+ protoMap = partitions.get(tableName);
+ }
+ protoMap.put(partitionName, builder.build());
+ partitions.put(tableName, protoMap);
+ }
+ break;
+ case DROP_PARTITION:
+ partitionDesc = alterTableDescProto.getPartitionDesc();
+ partitionName = partitionDesc.getPartitionName();
+ if(!partitions.containsKey(tableName)) {
+ throw new NoSuchPartitionException(databaseName, tableName, partitionName);
+ } else {
+ partitions.remove(partitionName);
+ }
+ break;
default:
- //TODO
}
}
+
private int getIndexOfColumnToBeRenamed(List<CatalogProtos.ColumnProto> fieldList, String columnName) {
int fieldCount = fieldList.size();
for (int index = 0; index < fieldCount; index++) {
@@ -498,39 +544,50 @@ public class MemStore implements CatalogStore {
}
@Override
- public void addPartitions(CatalogProtos.PartitionsProto partitionDescList) throws CatalogException {
- throw new RuntimeException("not supported!");
- }
+ public List<CatalogProtos.PartitionDescProto> getPartitions(String databaseName, String tableName) throws CatalogException {
+ List<CatalogProtos.PartitionDescProto> protos = new ArrayList<CatalogProtos.PartitionDescProto>();
- @Override
- public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto
- partitionDescProto) throws CatalogException {
- throw new RuntimeException("not supported!");
+ if (partitions.containsKey(tableName)) {
+ for (CatalogProtos.PartitionDescProto proto : partitions.get(tableName).values()) {
+ protos.add(proto);
+ }
+ }
+ return protos;
}
@Override
- public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException {
- throw new RuntimeException("not supported!");
+ public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName,
+ String partitionName) throws CatalogException {
+ if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) {
+ return partitions.get(tableName).get(partitionName);
+ } else {
+ throw new NoSuchPartitionException(partitionName);
+ }
}
- @Override
- public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException {
- throw new RuntimeException("not supported!");
- }
+ public List<TablePartitionProto> getAllPartitions() throws CatalogException {
+ List<TablePartitionProto> protos = new ArrayList<TablePartitionProto>();
+ Set<String> tables = partitions.keySet();
+ for (String table : tables) {
+ Map<String, CatalogProtos.PartitionDescProto> entryMap = partitions.get(table);
+ for (Map.Entry<String, CatalogProtos.PartitionDescProto> proto : entryMap.entrySet()) {
+ CatalogProtos.PartitionDescProto partitionDescProto = proto.getValue();
- @Override
- public void delPartition(String partitionName) throws CatalogException {
- throw new RuntimeException("not supported!");
- }
+ TablePartitionProto.Builder builder = TablePartitionProto.newBuilder();
- @Override
- public void dropPartitions(String tableName) throws CatalogException {
- throw new RuntimeException("not supported!");
- }
-
- @Override
- public List<TablePartitionProto> getAllPartitions() throws CatalogException {
- throw new UnsupportedOperationException();
+ builder.setPartitionName(partitionDescProto.getPartitionName());
+ builder.setPath(partitionDescProto.getPath());
+
+ // PARTITION_ID and TID is always necessary variables. In other CatalogStore excepting MemStore,
+ // all partitions would have PARTITION_ID and TID. But MemStore doesn't contain these variable values because
+ // it is implemented for test purpose. Thus, we need to set each variables to 0.
+ builder.setPartitionId(0);
+ builder.setTid(0);
+
+ protos.add(builder.build());
+ }
+ }
+ return protos;
}
/* (non-Javadoc)