You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2016/12/28 00:33:59 UTC

hive git commit: HIVE-15448: ChangeManager (Daniel Dai, reviewed by Thejas Nair)

Repository: hive
Updated Branches:
  refs/heads/master 212f18091 -> b558d49b0


HIVE-15448: ChangeManager (Daniel Dai, reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b558d49b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b558d49b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b558d49b

Branch: refs/heads/master
Commit: b558d49b03abf541dce093fd3f360a2e7b87de2e
Parents: 212f180
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Dec 27 16:33:28 2016 -0800
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Dec 27 16:33:28 2016 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  10 +
 .../hive/metastore/TestReplChangeManager.java   | 318 +++++++++++++++++++
 .../hadoop/hive/metastore/HiveMetaStore.java    |   2 +
 .../hive/metastore/ReplChangeManager.java       | 281 ++++++++++++++++
 .../apache/hadoop/hive/metastore/Warehouse.java |   4 +-
 5 files changed, 613 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b4e89b0..8bb7337 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -437,6 +437,16 @@ public class HiveConf extends Configuration {
         "with ${hive.scratch.dir.permission}."),
     REPLDIR("hive.repl.rootdir","/user/hive/repl/",
         "HDFS root dir for all replication dumps."),
+    REPLCMENABLED("hive.repl.cm.enabled", false,
+        "Turn on ChangeManager, so delete files will goes to cmrootdir."),
+    REPLCMDIR("hive.repl.cmrootdir","/user/hive/cmroot/",
+        "Root dir for ChangeManager, used for deleted files."),
+    REPLCMRETIAN("hive.repl.cm.retain","24h",
+        new TimeValidator(TimeUnit.HOURS),
+        "Time to retain removed files in cmrootdir."),
+    REPLCMINTERVAL("hive.repl.cm.interval","3600s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Inteval for cmroot cleanup thread."),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),

http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
new file mode 100644
index 0000000..0587221
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class TestReplChangeManager {
+  private static HiveMetaStoreClient client;
+  private static HiveConf hiveConf;
+  private static Warehouse warehouse;
+  private static MiniDFSCluster m_dfs;
+  private static String cmroot;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    m_dfs = new MiniDFSCluster.Builder(new Configuration()).numDataNodes(1).format(true).build();
+    hiveConf = new HiveConf(TestReplChangeManager.class);
+    hiveConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
+        "hdfs://" + m_dfs.getNameNode().getHostAndPort() + HiveConf.ConfVars.METASTOREWAREHOUSE.defaultStrVal);
+    hiveConf.setBoolean(HiveConf.ConfVars.REPLCMENABLED.varname, true);
+    cmroot = "hdfs://" + m_dfs.getNameNode().getHostAndPort() + "/cmroot";
+    hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot);
+    warehouse = new Warehouse(hiveConf);
+    try {
+      client = new HiveMetaStoreClient(hiveConf);
+    } catch (Throwable e) {
+      System.err.println("Unable to open the metastore");
+      System.err.println(StringUtils.stringifyException(e));
+      throw e;
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    try {
+      m_dfs.shutdown();
+      client.close();
+    } catch (Throwable e) {
+      System.err.println("Unable to close metastore");
+      System.err.println(StringUtils.stringifyException(e));
+      throw e;
+    }
+  }
+
+  private Partition createPartition(String dbName, String tblName,
+      List<FieldSchema> columns, List<String> partVals, SerDeInfo serdeInfo) {
+    StorageDescriptor sd = new StorageDescriptor(columns, null,
+        "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+        "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
+        false, 0, serdeInfo, null, null, null);
+    return new Partition(partVals, dbName, tblName, 0, 0, sd, null);
+  }
+
+  private void createFile(Path path, String content) throws IOException {
+    FSDataOutputStream output = path.getFileSystem(hiveConf).create(path);
+    output.writeChars(content);
+    output.close();
+  }
+
+  @Test
+  public void testRecyclePartTable() throws Exception {
+    // Create db1/t1/dt=20160101/part
+    //              /dt=20160102/part
+    //              /dt=20160103/part
+    // Test: recycle single partition (dt=20160101)
+    //       recycle table t1
+    String dbName = "db1";
+    client.dropDatabase(dbName, true, true);
+
+    Database db = new Database();
+    db.setName(dbName);
+    client.createDatabase(db);
+
+    String tblName = "t1";
+    List<FieldSchema> columns = new ArrayList<FieldSchema>();
+    columns.add(new FieldSchema("foo", "string", ""));
+    columns.add(new FieldSchema("bar", "string", ""));
+
+    List<FieldSchema> partColumns = new ArrayList<FieldSchema>();
+    partColumns.add(new FieldSchema("dt", "string", ""));
+
+    SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), new HashMap<String, String>());
+
+    StorageDescriptor sd
+      = new StorageDescriptor(columns, null,
+          "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+          "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
+    false, 0, serdeInfo, null, null, null);
+    Map<String, String> tableParameters = new HashMap<String, String>();
+
+    Table tbl = new Table(tblName, dbName, "", 0, 0, 0, sd, partColumns, tableParameters, "", "", "");
+
+    client.createTable(tbl);
+
+    List<String> values = Arrays.asList("20160101");
+    Partition part1 = createPartition(dbName, tblName, columns, values, serdeInfo);
+    client.add_partition(part1);
+
+    values = Arrays.asList("20160102");
+    Partition part2 = createPartition(dbName, tblName, columns, values, serdeInfo);
+    client.add_partition(part2);
+
+    values = Arrays.asList("20160103");
+    Partition part3 = createPartition(dbName, tblName, columns, values, serdeInfo);
+    client.add_partition(part3);
+
+    Path part1Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160101")), "part");
+    createFile(part1Path, "p1");
+    String path1Sig = ReplChangeManager.getCksumString(part1Path, hiveConf);
+
+    Path part2Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160102")), "part");
+    createFile(part2Path, "p2");
+    String path2Sig = ReplChangeManager.getCksumString(part2Path, hiveConf);
+
+    Path part3Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160103")), "part");
+    createFile(part3Path, "p3");
+    String path3Sig = ReplChangeManager.getCksumString(part3Path, hiveConf);
+
+    Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path));
+    Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path));
+    Assert.assertTrue(part3Path.getFileSystem(hiveConf).exists(part3Path));
+
+    ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf, warehouse);
+    // verify cm.recycle(db, table, part) api moves file to cmroot dir
+    int ret = cm.recycle(db, tbl, part1);
+    Assert.assertEquals(ret, 0);
+    ret = cm.recycle(db, tbl, part2);
+    Assert.assertEquals(ret, 0);
+    ret = cm.recycle(db, tbl, part3);
+    Assert.assertEquals(ret, 0);
+
+    Assert.assertFalse(part1Path.getFileSystem(hiveConf).exists(part1Path));
+    Assert.assertFalse(part2Path.getFileSystem(hiveConf).exists(part2Path));
+    Assert.assertFalse(part3Path.getFileSystem(hiveConf).exists(part3Path));
+
+    client.dropPartition(dbName, tblName, Arrays.asList("20160101"));
+
+    Path cmPart1Path = ReplChangeManager.getCMPath(part1Path, hiveConf, path1Sig);
+    Assert.assertTrue(cmPart1Path.getFileSystem(hiveConf).exists(cmPart1Path));
+
+    client.dropTable(dbName, tblName);
+
+    Path cmPart2Path = ReplChangeManager.getCMPath(part2Path, hiveConf, path2Sig);
+    Assert.assertTrue(cmPart2Path.getFileSystem(hiveConf).exists(cmPart2Path));
+
+    Path cmPart3Path = ReplChangeManager.getCMPath(part3Path, hiveConf, path3Sig);
+    Assert.assertTrue(cmPart3Path.getFileSystem(hiveConf).exists(cmPart3Path));
+
+    client.dropDatabase(dbName, true, true);
+  }
+
+  @Test
+  public void testRecycleNonPartTable() throws Exception {
+    // Create db2/t1/part1
+    //              /part2
+    //              /part3
+    // Test: recycle single file (part1)
+    //       recycle table t1
+    String dbName = "db2";
+    client.dropDatabase(dbName, true, true);
+
+    Database db = new Database();
+    db.setName(dbName);
+    client.createDatabase(db);
+
+    String tblName = "t1";
+    List<FieldSchema> columns = new ArrayList<FieldSchema>();
+    columns.add(new FieldSchema("foo", "string", ""));
+    columns.add(new FieldSchema("bar", "string", ""));
+
+    SerDeInfo serdeInfo = new SerDeInfo("LBCSerDe", LazyBinaryColumnarSerDe.class.getCanonicalName(), new HashMap<String, String>());
+
+    StorageDescriptor sd
+      = new StorageDescriptor(columns, null,
+          "org.apache.hadoop.hive.ql.io.orc.OrcInputFormat",
+          "org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat",
+    false, 0, serdeInfo, null, null, null);
+    Map<String, String> tableParameters = new HashMap<String, String>();
+
+    Table tbl = new Table(tblName, dbName, "", 0, 0, 0, sd, null, tableParameters, "", "", "");
+
+    client.createTable(tbl);
+
+    Path filePath1 = new Path(warehouse.getTablePath(db, tblName), "part1");
+    createFile(filePath1, "f1");
+    String fileSig1 = ReplChangeManager.getCksumString(filePath1, hiveConf);
+
+    Path filePath2 = new Path(warehouse.getTablePath(db, tblName), "part2");
+    createFile(filePath2, "f2");
+    String fileSig2 = ReplChangeManager.getCksumString(filePath2, hiveConf);
+
+    Path filePath3 = new Path(warehouse.getTablePath(db, tblName), "part3");
+    createFile(filePath3, "f3");
+    String fileSig3 = ReplChangeManager.getCksumString(filePath3, hiveConf);
+
+
+    Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1));
+    Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2));
+    Assert.assertTrue(filePath3.getFileSystem(hiveConf).exists(filePath3));
+
+    ReplChangeManager cm = ReplChangeManager.getInstance(hiveConf, warehouse);
+    // verify cm.recycle(Path) api moves file to cmroot dir
+    cm.recycle(filePath1);
+    Assert.assertFalse(filePath1.getFileSystem(hiveConf).exists(filePath1));
+
+    Path cmPath1 = ReplChangeManager.getCMPath(filePath1, hiveConf, fileSig1);
+    Assert.assertTrue(cmPath1.getFileSystem(hiveConf).exists(cmPath1));
+
+    // verify cm.recycle(db, table) api moves file to cmroot dir
+    int ret = cm.recycle(db, tbl);
+    Assert.assertEquals(ret, 0);
+
+    Assert.assertFalse(filePath2.getFileSystem(hiveConf).exists(filePath2));
+    Assert.assertFalse(filePath3.getFileSystem(hiveConf).exists(filePath3));
+
+    client.dropTable(dbName, tblName);
+
+    Path cmPath2 = ReplChangeManager.getCMPath(filePath2, hiveConf, fileSig2);
+    Assert.assertTrue(cmPath2.getFileSystem(hiveConf).exists(cmPath2));
+
+    Path cmPath3 = ReplChangeManager.getCMPath(filePath3, hiveConf, fileSig3);
+    Assert.assertTrue(cmPath3.getFileSystem(hiveConf).exists(cmPath3));
+
+    client.dropDatabase(dbName, true, true);
+  }
+
+  @Test
+  public void testClearer() throws Exception {
+    FileSystem fs = new Path(cmroot).getFileSystem(hiveConf);
+    long now = System.currentTimeMillis();
+    Path dirDb = new Path(cmroot, "db3");
+    fs.mkdirs(dirDb);
+    Path dirTbl1 = new Path(dirDb, "tbl1");
+    fs.mkdirs(dirTbl1);
+    Path part11 = new Path(dirTbl1, "part1");
+    fs.create(part11).close();
+    fs.setTimes(part11, now - 86400*1000*2, now - 86400*1000*2);
+    Path part12 = new Path(dirTbl1, "part2");
+    fs.create(part12).close();
+    Path dirTbl2 = new Path(dirDb, "tbl2");
+    fs.mkdirs(dirTbl2);
+    Path part21 = new Path(dirTbl2, "part1");
+    fs.create(part21).close();
+    fs.setTimes(part21, now - 86400*1000*2, now - 86400*1000*2);
+    Path part22 = new Path(dirTbl2, "part2");
+    fs.create(part22).close();
+    Path dirTbl3 = new Path(dirDb, "tbl3");
+    fs.mkdirs(dirTbl3);
+    Path part31 = new Path(dirTbl3, "part1");
+    fs.create(part31).close();
+    fs.setTimes(part31, now - 86400*1000*2, now - 86400*1000*2);
+    Path part32 = new Path(dirTbl3, "part2");
+    fs.create(part32).close();
+    fs.setTimes(part32, now - 86400*1000*2, now - 86400*1000*2);
+
+    ReplChangeManager.scheduleCMClearer(hiveConf);
+
+    long start = System.currentTimeMillis();
+    long end;
+    boolean cleared = false;
+    do {
+      Thread.sleep(200);
+      end = System.currentTimeMillis();
+      if (end - start > 5000) {
+        Assert.fail("timeout, cmroot has not been cleared");
+      }
+      if (!part11.getFileSystem(hiveConf).exists(part11) &&
+          part12.getFileSystem(hiveConf).exists(part12) &&
+          !part21.getFileSystem(hiveConf).exists(part21) &&
+          part22.getFileSystem(hiveConf).exists(part22) &&
+          !dirTbl3.getFileSystem(hiveConf).exists(dirTbl3)) {
+        cleared = true;
+      }
+    } while (!cleared);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 2892da3..121b825 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -7238,6 +7238,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         } finally {
           startLock.unlock();
         }
+
+        ReplChangeManager.scheduleCMClearer(conf);
       }
     };
     t.setDaemon(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
new file mode 100644
index 0000000..a49a80e
--- /dev/null
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -0,0 +1,281 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReplChangeManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplChangeManager.class);
+  static private ReplChangeManager instance;
+
+  private static boolean inited = false;
+  private static boolean enabled = false;
+  private static Path cmroot;
+  private static HiveConf conf;
+  private static Warehouse wh;
+  private String user;
+  private String group;
+
+  public static ReplChangeManager getInstance(HiveConf conf, Warehouse wh) throws IOException {
+    if (instance == null) {
+      instance = new ReplChangeManager(conf, wh);
+    }
+    return instance;
+  }
+
+  ReplChangeManager(HiveConf conf, Warehouse wh) throws IOException {
+    if (!inited) {
+      if (conf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
+        ReplChangeManager.enabled = true;
+        ReplChangeManager.cmroot = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname));
+        ReplChangeManager.conf = conf;
+        ReplChangeManager.wh = wh;
+
+        FileSystem fs = cmroot.getFileSystem(conf);
+        // Create cmroot with permission 700 if not exist
+        if (!fs.exists(cmroot)) {
+          fs.mkdirs(cmroot);
+          fs.setPermission(cmroot, new FsPermission("700"));
+        }
+        UserGroupInformation usergroupInfo = UserGroupInformation.getCurrentUser();
+        user = usergroupInfo.getShortUserName();
+        group = usergroupInfo.getPrimaryGroupName();
+      }
+      inited = true;
+    }
+  }
+
+  /***
+   * Recycle a managed table, move table files to cmroot
+   * @param db
+   * @param table
+   * @return
+   * @throws IOException
+   * @throws MetaException
+   */
+  public int recycle(Database db, Table table) throws IOException, MetaException {
+    if (!enabled) {
+      return 0;
+    }
+
+    Path tablePath = wh.getTablePath(db, table.getTableName());
+    FileSystem fs = tablePath.getFileSystem(conf);
+    int failCount = 0;
+    for (FileStatus file : fs.listStatus(tablePath)) {
+      if (!recycle(file.getPath())) {
+        failCount++;
+      }
+    }
+    return failCount;
+  }
+
+  /***
+   * Recycle a partition of a managed table, move partition files to cmroot
+   * @param db
+   * @param table
+   * @param part
+   * @return
+   * @throws IOException
+   * @throws MetaException
+   */
+  public int recycle(Database db, Table table, Partition part) throws IOException, MetaException {
+    if (!enabled) {
+      return 0;
+    }
+
+    Map<String, String> pm = Warehouse.makeSpecFromValues(table.getPartitionKeys(), part.getValues());
+    Path partPath = wh.getPartitionPath(db, table.getTableName(), pm);
+    FileSystem fs = partPath.getFileSystem(conf);
+    int failCount = 0;
+    for (FileStatus file : fs.listStatus(partPath)) {
+      if (!recycle(file.getPath())) {
+        failCount++;
+      }
+    }
+    return failCount;
+  }
+
+  /***
+   * Recycle a single file (of a partition, or table if nonpartitioned),
+   *   move files to cmroot. Note the table must be managed table
+   * @param path
+   * @return
+   * @throws IOException
+   * @throws MetaException
+   */
+  public boolean recycle(Path path) throws IOException, MetaException {
+    if (!enabled) {
+      return true;
+    }
+
+    Path cmPath = getCMPath(path, conf, getCksumString(path, conf));
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Moving " + path.toString() + " to " + cmPath.toString());
+    }
+
+    FileSystem fs = path.getFileSystem(conf);
+
+    boolean succ = fs.rename(path, cmPath);
+    // Ignore if a file with same content already exist in cmroot
+    // We might want to setXAttr for the new location in the future
+    if (!succ) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("A file with the same content of " + path.toString() + " already exists, ignore");
+      }
+    } else {
+      long now = System.currentTimeMillis();
+      fs.setTimes(cmPath, now, now);
+
+      // set the file owner to hive (or the id metastore run as)
+      fs.setOwner(cmPath, user, group);
+
+      // tag the original file name so we know where the file comes from
+      fs.setXAttr(cmPath, "user.original-loc", path.toString().getBytes());
+    }
+    return succ;
+  }
+
+  // Get checksum of a file
+  static public String getCksumString(Path path, Configuration conf) throws IOException {
+    // TODO: fs checksum only available on hdfs, need to
+    //       find a solution for other fs (eg, local fs, s3, etc)
+    FileSystem fs = path.getFileSystem(conf);
+    FileChecksum checksum = fs.getFileChecksum(path);
+    String checksumString = StringUtils.byteToHexString(
+        checksum.getBytes(), 0, checksum.getLength());
+    return checksumString;
+  }
+
+  /***
+   * Convert a path of file inside a partition or table (if non-partitioned)
+   *   to a deterministic location of cmroot. So user can retrieve the file back
+   *   with the original location plus signature.
+   * @param path original path inside partition or table
+   * @param conf
+   * @param signature unique signature of the file, can be retrieved by {@link getSignature}
+   * @return
+   * @throws IOException
+   * @throws MetaException
+   */
+  static public Path getCMPath(Path path, Configuration conf, String signature)
+      throws IOException, MetaException {
+    String newFileName = signature + path.getName();
+    int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);
+
+    if (newFileName.length() > maxLength) {
+      newFileName = newFileName.substring(0, maxLength-1);
+    }
+
+    Path cmPath = new Path(cmroot, newFileName);
+
+    return cmPath;
+  }
+
+  /**
+   * Thread to clear old files of cmroot recursively
+   */
+  static class CMClearer implements Runnable {
+    private Path cmroot;
+    private long secRetain;
+    private Configuration conf;
+
+    CMClearer(String cmrootString, long secRetain, Configuration conf) {
+      this.cmroot = new Path(cmrootString);
+      this.secRetain = secRetain;
+      this.conf = conf;
+    }
+
+    @Override
+    public void run() {
+      try {
+        LOG.info("CMClearer started");
+        long now = System.currentTimeMillis();
+        processDir(cmroot, now);
+      } catch (IOException e) {
+        LOG.error("Exception when clearing cmroot:" + StringUtils.stringifyException(e));
+      }
+    }
+
+    private boolean processDir(Path folder, long now) throws IOException {
+      FileStatus[] files = folder.getFileSystem(conf).listStatus(folder);
+      boolean empty = true;
+      for (FileStatus file : files) {
+        if (file.isDirectory()) {
+          if (processDir(file.getPath(), now)) {
+            file.getPath().getFileSystem(conf).delete(file.getPath(), false);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Remove " + file.toString());
+            }
+          } else {
+            empty = false;
+          }
+        } else {
+          long modifiedTime = file.getModificationTime();
+          if (now - modifiedTime > secRetain*1000) {
+            file.getPath().getFileSystem(conf).delete(file.getPath(), false);
+          } else {
+            empty = false;
+          }
+        }
+      }
+      return empty;
+    }
+  }
+
+  // Schedule CMClearer thread. Will be invoked by metastore
+  public static void scheduleCMClearer(HiveConf hiveConf) {
+    if (hiveConf.getBoolVar(HiveConf.ConfVars.REPLCMENABLED)) {
+      ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(
+          new BasicThreadFactory.Builder()
+          .namingPattern("cmclearer-%d")
+          .daemon(true)
+          .build());
+      executor.scheduleAtFixedRate(new CMClearer(hiveConf.get(HiveConf.ConfVars.REPLCMDIR.varname),
+          HiveConf.getTimeVar(hiveConf, ConfVars.REPLCMRETIAN, TimeUnit.SECONDS), hiveConf),
+          0,
+          HiveConf.getTimeVar(hiveConf, ConfVars.REPLCMINTERVAL,
+          TimeUnit.SECONDS), TimeUnit.SECONDS);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/b558d49b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 6aca1b7..549ca52 100755
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -448,11 +448,11 @@ public class Warehouse {
   }
 
   public Path getPartitionPath(Database db, String tableName,
-      LinkedHashMap<String, String> pm) throws MetaException {
+      Map<String, String> pm) throws MetaException {
     return new Path(getTablePath(db, tableName), makePartPath(pm));
   }
 
-  public Path getPartitionPath(Path tblPath, LinkedHashMap<String, String> pm)
+  public Path getPartitionPath(Path tblPath, Map<String, String> pm)
       throws MetaException {
     return new Path(tblPath, makePartPath(pm));
   }