You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by bl...@apache.org on 2015/09/24 07:42:09 UTC

tajo git commit: TAJO-1673: Implement recover partitions.

Repository: tajo
Updated Branches:
  refs/heads/master 5defb26c4 -> 5af330d22


TAJO-1673: Implement recover partitions.

Closes #626


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/5af330d2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/5af330d2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/5af330d2

Branch: refs/heads/master
Commit: 5af330d228991b9a2a7a9ce49979fd6feb84d464
Parents: 5defb26
Author: JaeHwa Jung <bl...@apache.org>
Authored: Thu Sep 24 14:36:33 2015 +0900
Committer: JaeHwa Jung <bl...@apache.org>
Committed: Thu Sep 24 14:36:33 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../apache/tajo/algebra/AlterTableOpType.java   |   2 +-
 .../tajo/engine/planner/TestLogicalPlanner.java |  19 +++
 .../tajo/engine/query/TestAlterTable.java       |  95 +++++++++++++
 .../alter_table_drop_partition1.sql             |   2 +-
 .../alter_table_drop_partition2.sql             |   2 +-
 .../create_partitioned_table2.sql               |   2 +
 .../alter_table_repair_partition_1.sql          |   1 +
 .../alter_table_repair_partition_1.result       |   8 ++
 .../apache/tajo/master/exec/DDLExecutor.java    | 138 +++++++++++++++++--
 .../org/apache/tajo/parser/sql/SQLAnalyzer.java |   6 +
 .../main/sphinx/sql_language/alter_table.rst    |  19 ++-
 .../rewrite/rules/PartitionedTableRewriter.java |   4 +-
 .../plan/serder/LogicalNodeDeserializer.java    |   3 +
 .../tajo/plan/serder/LogicalNodeSerializer.java |   4 +
 tajo-plan/src/main/proto/Plan.proto             |   1 +
 .../org/apache/tajo/parser/sql/SQLLexer.g4      |   1 +
 .../org/apache/tajo/parser/sql/SQLParser.g4     |   2 +
 18 files changed, 296 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ea5f1de..433be87 100644
--- a/CHANGES
+++ b/CHANGES
@@ -681,6 +681,8 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1673: Implement recover partitions. (jaehwa)
+
     TAJO-1844: Eliminate explicit diamond expressions. 
     (Contributed by Dongkyu Hwangbo, committed by hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
----------------------------------------------------------------------
diff --git a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
index 679ab4b..89daef0 100644
--- a/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
+++ b/tajo-algebra/src/main/java/org/apache/tajo/algebra/AlterTableOpType.java
@@ -18,5 +18,5 @@
 package org.apache.tajo.algebra;
 
 public enum AlterTableOpType {
-  RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, SET_PROPERTY
+  RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION, SET_PROPERTY, REPAIR_PARTITION
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index e9220c3..5a0bd94 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -1261,6 +1261,25 @@ public class TestLogicalPlanner {
     return root.getChild();
   }
 
+  @Test
+  public final void testAlterTableRepairPartiton() throws TajoException {
+    QueryContext qc = createQueryContext();
+
+    String sql = "ALTER TABLE table1 REPAIR PARTITION";
+    Expr expr = sqlAnalyzer.parse(sql);
+    LogicalPlan rootNode = planner.createPlan(qc, expr);
+    LogicalNode plan = rootNode.getRootBlock().getRoot();
+    testJsonSerDerObject(plan);
+    assertEquals(NodeType.ROOT, plan.getType());
+    LogicalRootNode root = (LogicalRootNode) plan;
+    assertEquals(NodeType.ALTER_TABLE, root.getChild().getType());
+
+    AlterTableNode msckNode = root.getChild();
+
+    assertEquals(msckNode.getAlterTableOpType(), AlterTableOpType.REPAIR_PARTITION);
+    assertEquals(msckNode.getTableName(), "table1");
+  }
+
   String [] ALTER_PARTITIONS = {
     "ALTER TABLE partitioned_table ADD PARTITION (col1 = 1 , col2 = 2) LOCATION 'hdfs://xxx" +
       ".com/warehouse/partitioned_table/col1=1/col2=2'", //0

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
index 8339ea7..d10c0f2 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java
@@ -25,6 +25,10 @@ import org.apache.tajo.QueryTestCaseBase;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.UndefinedDatabaseException;
+import org.apache.tajo.exception.UndefinedPartitionException;
+import org.apache.tajo.exception.UndefinedPartitionMethodException;
+import org.apache.tajo.exception.UndefinedTableException;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -110,5 +114,96 @@ public class TestAlterTable extends QueryTestCaseBase {
     assertNotNull(partitions);
     assertEquals(partitions.size(), 0);
     assertFalse(fs.exists(partitionPath));
+
+    catalog.dropTable(tableName);
+  }
+
+  @Test
+  public final void testAlterTableRepairPartition() throws Exception {
+    executeDDL("create_partitioned_table2.sql", null);
+
+    String simpleTableName = "partitioned_table2";
+    String tableName = CatalogUtil.buildFQName(getCurrentDatabase(), simpleTableName);
+    assertTrue(catalog.existsTable(tableName));
+
+    TableDesc tableDesc = catalog.getTableDesc(tableName);
+    assertEquals(tableDesc.getName(), tableName);
+    assertEquals(tableDesc.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN);
+    assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getAllColumns().size(), 2);
+    assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "col1");
+    assertEquals(tableDesc.getPartitionMethod().getExpressionSchema().getColumn(1).getSimpleName(), "col2");
+
+    ResultSet res = executeString(
+      "insert overwrite into " + simpleTableName + " select l_quantity, l_returnflag, l_orderkey, l_partkey " +
+      " from default.lineitem");
+    res.close();
+
+    res = executeString("select * from " + simpleTableName + " order by col1, col2, col3, col4");
+    String result = resultSetToString(res);
+    String expectedResult = "col3,col4,col1,col2\n" +
+      "-------------------------------\n" +
+      "17.0,N,1,1\n" +
+      "36.0,N,1,1\n" +
+      "38.0,N,2,2\n" +
+      "45.0,R,3,2\n" +
+      "49.0,R,3,3\n";
+
+    res.close();
+    assertEquals(expectedResult, result);
+
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    Path tablePath = new Path(tableDesc.getUri());
+    FileSystem fs = tablePath.getFileSystem(conf);
+    assertTrue(fs.exists(new Path(tableDesc.getUri())));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3")));
+
+    // Remove all partitions
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 1 , col2 = 1)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 2 , col2 = 2)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 = 2)").close();
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 = 3)").close();
+
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 0);
+
+    assertTrue(fs.exists(new Path(tableDesc.getUri())));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=1/col2=1")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=2/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=2")));
+    assertTrue(fs.isDirectory(new Path(tablePath.toUri() + "/col1=3/col2=3")));
+
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Remove just one of existing partitions
+    executeString("ALTER TABLE " + simpleTableName + " DROP PARTITION (col1 = 3 , col2 = 3)").close();
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Remove a partition directory from filesystem
+    fs.delete(new Path(tablePath.toUri() + "/col1=3/col2=3"), true);
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+
+    // Add abnormal directories
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col10=1/col20=1")));
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/col1=")));
+    assertTrue(fs.mkdirs(new Path(tablePath.toUri() + "/test")));
+    assertEquals(6, fs.listStatus(new Path(tablePath.toUri())).length);
+
+    executeString("ALTER TABLE " + simpleTableName + " REPAIR PARTITION").close();
+    verifyPartitionCount(getCurrentDatabase(), simpleTableName, 4);
+    catalog.dropTable(tableName);
+  }
+
+  private void verifyPartitionCount(String databaseName, String tableName, int expectedCount)
+    throws UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException,
+    UndefinedPartitionException {
+    List<CatalogProtos.PartitionDescProto> partitions = catalog.getPartitions(databaseName, tableName);
+    assertNotNull(partitions);
+    assertEquals(partitions.size(), expectedCount);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
index b5d672f..cc4d6dd 100644
--- a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition1.sql
@@ -1 +1 @@
-ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file
+ALTER TABLE partitioned_table DROP PARTITION (col3 = 1 , col4 = 2) PURGE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
index 0d4c932..452164b 100644
--- a/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
+++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/alter_table_drop_partition2.sql
@@ -1 +1 @@
-ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2)
\ No newline at end of file
+ALTER TABLE partitioned_table DROP IF EXISTS PARTITION (col3 = 1 , col4 = 2) PURGE
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
new file mode 100644
index 0000000..0fc8094
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestAlterTable/create_partitioned_table2.sql
@@ -0,0 +1,2 @@
+create table partitioned_table2 (col3 float8, col4 text) USING text  WITH ('text.delimiter'='|')
+PARTITION by column(col1 int4, col2 int4)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
new file mode 100644
index 0000000..b65b0e6
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/queries/TestSQLAnalyzer/alter_table_repair_partition_1.sql
@@ -0,0 +1 @@
+ALTER TABLE table1 REPAIR PARTITION
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
new file mode 100644
index 0000000..daca3b3
--- /dev/null
+++ b/tajo-core-tests/src/test/resources/results/TestSQLAnalyzer/alter_table_repair_partition_1.result
@@ -0,0 +1,8 @@
+{
+  "OldTableName": "table1",
+  "AlterTableType": "REPAIR_PARTITION",
+  "IsPurge": false,
+  "IfNotExists": false,
+  "IfExists": false,
+  "OpType": "AlterTable"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
index 92e1775..dac99e5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/DDLExecutor.java
@@ -23,25 +23,31 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.tajo.algebra.AlterTableOpType;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.PartitionKeyProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.exception.*;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.FileTablespace;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.Tablespace;
 import org.apache.tajo.storage.TablespaceManager;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.StringUtils;
+import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -529,24 +535,138 @@ public class DDLExecutor {
         catalog.alterTable(CatalogUtil.addOrDropPartition(qualifiedName, alterTable.getPartitionColumns(),
             alterTable.getPartitionValues(), alterTable.getLocation(), AlterTableType.DROP_PARTITION));
 
-        // When dropping partition on an managed table, the data will be delete from file system.
-        if (!desc.isExternal()) {
+        // When dropping a partition on a table, its data will NOT be deleted if the 'PURGE' option is not specified.
+        if (alterTable.isPurge()) {
           deletePartitionPath(partitionDescProto);
-        } else {
-          // When dropping partition on an external table, the data in the table will NOT be deleted from the file
-          // system. But if PURGE is specified, the partition data will be deleted.
-          if (alterTable.isPurge()) {
-            deletePartitionPath(partitionDescProto);
-          }
         }
       }
-
+      break;
+    case REPAIR_PARTITION:
+      repairPartition(context, queryContext, alterTable);
       break;
     default:
       throw new InternalError("alterTable cannot handle such query: \n" + alterTable.toJson());
     }
   }
 
+  /**
+   * Run ALTER TABLE table_name REPAIR TABLE  statement.
+   * This will recovery all partitions which exists on table directory.
+   *
+   *
+   * @param context
+   * @param queryContext
+   * @param alterTable
+   * @throws IOException
+   */
+  public void repairPartition(TajoMaster.MasterContext context, final QueryContext queryContext,
+                         final AlterTableNode alterTable) throws IOException, TajoException {
+    final CatalogService catalog = context.getCatalog();
+    final String tableName = alterTable.getTableName();
+
+    String databaseName;
+    String simpleTableName;
+    if (CatalogUtil.isFQTableName(tableName)) {
+      String[] split = CatalogUtil.splitFQTableName(tableName);
+      databaseName = split[0];
+      simpleTableName = split[1];
+    } else {
+      databaseName = queryContext.getCurrentDatabase();
+      simpleTableName = tableName;
+    }
+
+    if (!catalog.existsTable(databaseName, simpleTableName)) {
+      throw new UndefinedTableException(alterTable.getTableName());
+    }
+
+    TableDesc tableDesc = catalog.getTableDesc(databaseName, simpleTableName);
+
+    if(tableDesc.getPartitionMethod() == null) {
+      throw new UndefinedPartitionMethodException(simpleTableName);
+    }
+
+    Path tablePath = new Path(tableDesc.getUri());
+    FileSystem fs = tablePath.getFileSystem(context.getConf());
+
+    PartitionMethodDesc partitionDesc = tableDesc.getPartitionMethod();
+    Schema partitionColumns = partitionDesc.getExpressionSchema();
+
+    // Get the array of path filter, accepting all partition paths.
+    PathFilter[] filters = PartitionedTableRewriter.buildAllAcceptingPathFilters(partitionColumns);
+
+    // loop from one to the number of partition columns
+    Path [] filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(tablePath, filters[0]));
+
+    // Get all file status matched to a ith level path filter.
+    for (int i = 1; i < partitionColumns.size(); i++) {
+      filteredPaths = PartitionedTableRewriter.toPathArray(fs.listStatus(filteredPaths, filters[i]));
+    }
+
+    // Find missing partitions from filesystem
+    List<PartitionDescProto> existingPartitions = catalog.getPartitions(databaseName, simpleTableName);
+    List<String> existingPartitionNames = TUtil.newList();
+    Path existingPartitionPath = null;
+
+    for(PartitionDescProto existingPartition : existingPartitions) {
+      existingPartitionPath = new Path(existingPartition.getPath());
+      existingPartitionNames.add(existingPartition.getPartitionName());
+      if (!fs.exists(existingPartitionPath) && LOG.isDebugEnabled()) {
+        LOG.debug("Partitions missing from Filesystem:" + existingPartition.getPartitionName());
+      }
+    }
+
+    // Find missing partitions from CatalogStore
+    List<PartitionDescProto> targetPartitions = TUtil.newList();
+    for(Path filteredPath : filteredPaths) {
+      PartitionDescProto targetPartition = getPartitionDesc(simpleTableName, filteredPath);
+      if (!existingPartitionNames.contains(targetPartition.getPartitionName())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Partitions not in CatalogStore:" + targetPartition.getPartitionName());
+        }
+        targetPartitions.add(targetPartition);
+      }
+    }
+
+    catalog.addPartitions(databaseName, simpleTableName, targetPartitions, true);
+
+    if (LOG.isDebugEnabled()) {
+      for(PartitionDescProto targetPartition: targetPartitions) {
+        LOG.debug("Repair: Added partition to CatalogStore " + tableName + ":" + targetPartition.getPartitionName());
+      }
+    }
+
+    LOG.info("Total added partitions to CatalogStore: " + targetPartitions.size());
+  }
+
+  private PartitionDescProto getPartitionDesc(String tableName, Path path) throws IOException {
+    String partitionPath = path.toString();
+
+    String partitionName = StringUtils.unescapePathName(partitionPath);
+    int startIndex = partitionPath.indexOf(tableName);
+    partitionName = partitionName.substring(startIndex + tableName.length() + 1, partitionPath.length());
+
+    CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
+    builder.setPartitionName(partitionName);
+
+    String[] partitionKeyPairs = partitionName.split("/");
+
+    for(int i = 0; i < partitionKeyPairs.length; i++) {
+      String partitionKeyPair = partitionKeyPairs[i];
+      String[] split = partitionKeyPair.split("=");
+
+      PartitionKeyProto.Builder keyBuilder = PartitionKeyProto.newBuilder();
+      keyBuilder.setColumnName(split[0]);
+      keyBuilder.setPartitionValue(split[1]);
+
+      builder.addPartitionKeys(keyBuilder.build());
+    }
+
+    builder.setPath(partitionPath);
+
+    return builder.build();
+  }
+
+
   private void deletePartitionPath(CatalogProtos.PartitionDescProto partitionDescProto) throws IOException {
     Path partitionPath = new Path(partitionDescProto.getPath());
     FileSystem fs = partitionPath.getFileSystem(context.getConf());

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
index 4526144..717366a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
+++ b/tajo-core/src/main/java/org/apache/tajo/parser/sql/SQLAnalyzer.java
@@ -1947,6 +1947,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
     final int PARTITION_MASK = 00000020;
     final int SET_MASK = 00000002;
     final int PROPERTY_MASK = 00010000;
+    final int REPAIR_MASK = 00000003;
 
     int val = 00000000;
 
@@ -1978,6 +1979,9 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
           case PROPERTY:
             val = val | PROPERTY_MASK;
             break;
+          case REPAIR:
+            val = val | REPAIR_MASK;
+            break;
           default:
             break;
         }
@@ -1989,6 +1993,8 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> {
   private AlterTableOpType evaluateAlterTableOperationTye(final int value) {
 
     switch (value) {
+      case 19:
+        return AlterTableOpType.REPAIR_PARTITION;
       case 65:
         return AlterTableOpType.RENAME_TABLE;
       case 73:

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
----------------------------------------------------------------------
diff --git a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
index ffc34d1..959ebcc 100644
--- a/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
+++ b/tajo-docs/src/main/sphinx/sql_language/alter_table.rst
@@ -96,4 +96,21 @@ You can use ``ALTER TABLE ADD PARTITION`` to add partitions to a table. The loca
   ALTER TABLE table1 DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' )
   ALTER TABLE table1 DROP PARTITION (col1 = 'TAJO' ) PURGE
 
-You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This removes the data for a managed table and this doesn't remove the data for an external table. But if ``PURGE`` is specified for an external table, the partition data will be removed. The metadata is completely lost in all cases. An error is thrown if the partition for the table doesn't exists. You can use ``IF EXISTS`` to skip the error.
+You can use ``ALTER TABLE DROP PARTITION`` to drop a partition for a table. This doesn't remove the data for a table. But if ``PURGE`` is specified, the partition data will be removed. The metadata is completely lost in all cases. An error is thrown if the partition for the table doesn't exist. You can use ``IF EXISTS`` to skip the error.
+
+========================
+REPAIR PARTITION
+========================
+
+Tajo stores a list of partitions for each table in its catalog. If partitions are manually added to the distributed file system, the metastore is not aware of these partitions. Running the ``ALTER TABLE REPAIR PARTITION`` statement ensures that the tables are properly populated.
+
+*Synopsis*
+
+.. code-block:: sql
+
+  ALTER TABLE <table_name> REPAIR PARTITION
+
+.. note::
+
+  Even though an information of a partition is stored in the catalog, Tajo does not recover it when its partition directory doesn't exist in the file system.
+

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
index b5cd42b..5123fc4 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java
@@ -199,7 +199,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule {
    * @param partitionColumns The partition columns schema
    * @return The array of path filter, accpeting all partition paths.
    */
-  private static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
+  public static PathFilter [] buildAllAcceptingPathFilters(Schema partitionColumns) {
     Column target;
     PathFilter [] filters = new PathFilter[partitionColumns.size()];
     List<EvalNode> accumulatedFilters = Lists.newArrayList();
@@ -214,7 +214,7 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule {
     return filters;
   }
 
-  private static Path [] toPathArray(FileStatus[] fileStatuses) {
+  public static Path [] toPathArray(FileStatus[] fileStatuses) {
     Path [] paths = new Path[fileStatuses.length];
     for (int j = 0; j < fileStatuses.length; j++) {
       paths[j] = fileStatuses[j].getPath();

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
index 608fa4c..c75c3fd 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java
@@ -650,6 +650,9 @@ public class LogicalNodeDeserializer {
       alterTable.setPurge(alterPartition.getPurge());
       alterTable.setIfExists(alterPartition.getIfExists());
       break;
+    case REPAIR_PARTITION:
+      alterTable.setTableName(alterTableProto.getTableName());
+      break;
     default:
       throw new TajoRuntimeException(
           new NotImplementedException("Unknown SET type in ALTER TABLE: " + alterTableProto.getSetType().name()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
index a0f1fcc..3cf7d9e 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java
@@ -633,6 +633,10 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe
       partitionBuilder.setPurge(node.isPurge());
       alterTableBuilder.setAlterPartition(partitionBuilder);
       break;
+    case REPAIR_PARTITION:
+      alterTableBuilder.setSetType(PlanProto.AlterTableNode.Type.REPAIR_PARTITION);
+      alterTableBuilder.setTableName(node.getTableName());
+      break;
     default:
       throw new TajoRuntimeException(
           new NotImplementedException("Unknown SET type in ALTER TABLE: " + node.getAlterTableOpType().name()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-plan/src/main/proto/Plan.proto
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto
index 8a8ecb1..fa1deeb 100644
--- a/tajo-plan/src/main/proto/Plan.proto
+++ b/tajo-plan/src/main/proto/Plan.proto
@@ -302,6 +302,7 @@ message AlterTableNode {
     SET_PROPERTY = 3;
     ADD_PARTITION = 4;
     DROP_PARTITION = 5;
+    REPAIR_PARTITION = 6;    
   }
 
   message RenameTable {

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
----------------------------------------------------------------------
diff --git a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4 b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
index 896f627..ee61320 100644
--- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
+++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLLexer.g4
@@ -290,6 +290,7 @@ RANK : R A N K;
 RECORD : R E C O R D;
 REGEXP : R E G E X P;
 RENAME : R E N A M E;
+REPAIR : R E P A I R;
 RESET : R E S E T;
 RLIKE : R L I K E;
 ROLLUP : R O L L U P;

http://git-wip-us.apache.org/repos/asf/tajo/blob/5af330d2/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
----------------------------------------------------------------------
diff --git a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4 b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
index c125352..e2693ea 100644
--- a/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
+++ b/tajo-sql-parser/src/main/antlr4/org/apache/tajo/parser/sql/SQLParser.g4
@@ -308,6 +308,7 @@ nonreserved_keywords
   | RECORD
   | REGEXP
   | RENAME
+  | REPAIR
   | RESET
   | RLIKE
   | ROLLUP
@@ -1624,6 +1625,7 @@ alter_table_statement
   | ALTER TABLE table_name ADD (if_not_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (LOCATION path=Character_String_Literal)?
   | ALTER TABLE table_name DROP (if_exists)? PARTITION LEFT_PAREN partition_column_value_list RIGHT_PAREN (PURGE)?
   | ALTER TABLE table_name SET PROPERTY property_list
+  | ALTER TABLE table_name REPAIR PARTITION
   ;
 
 partition_column_value_list