You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/03/12 06:25:51 UTC
hive git commit: HIVE-18832: Support change management for trashing
data files from ACID tables.(Anishek Agarwal, reviewed by Sankar Hariappan)
Repository: hive
Updated Branches:
refs/heads/master d2cb97b6f -> 53df7e881
HIVE-18832: Support change management for trashing data files from ACID tables.(Anishek Agarwal, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/53df7e88
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/53df7e88
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/53df7e88
Branch: refs/heads/master
Commit: 53df7e881723827a29a783fcbff67a16929950ec
Parents: d2cb97b
Author: Anishek Agarwal <an...@gmail.com>
Authored: Mon Mar 12 11:55:42 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Mon Mar 12 11:55:42 2018 +0530
----------------------------------------------------------------------
.../hadoop/hive/ql/parse/WarehouseInstance.java | 8 +-
.../compactor/TestCleanerWithReplication.java | 198 +++++++++++++++++++
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 25 ++-
.../hive/ql/txn/compactor/CompactorTest.java | 9 +-
.../hive/ql/txn/compactor/TestCleaner.java | 8 -
.../hive/ql/txn/compactor/TestCleaner2.java | 3 -
.../hive/ql/txn/compactor/TestInitiator.java | 8 -
.../hive/ql/txn/compactor/TestWorker.java | 4 -
.../hive/ql/txn/compactor/TestWorker2.java | 4 -
.../hive/metastore/ReplChangeManager.java | 163 ++++++++-------
.../apache/hadoop/hive/metastore/Warehouse.java | 20 +-
11 files changed, 319 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index 33e5157..feb1191 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -56,7 +56,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-class WarehouseInstance implements Closeable {
+public class WarehouseInstance implements Closeable {
final String functionsRoot;
private Logger logger;
private IDriver driver;
@@ -85,7 +85,7 @@ class WarehouseInstance implements Closeable {
initialize(cmRootPath.toString(), warehouseRoot.toString(), overridesForHiveConf);
}
- WarehouseInstance(Logger logger, MiniDFSCluster cluster,
+ public WarehouseInstance(Logger logger, MiniDFSCluster cluster,
Map<String, String> overridesForHiveConf) throws Exception {
this(logger, cluster, overridesForHiveConf, null);
}
@@ -165,7 +165,7 @@ class WarehouseInstance implements Closeable {
return (lastResults.get(0).split("\\t"))[colNum];
}
- WarehouseInstance run(String command) throws Throwable {
+ public WarehouseInstance run(String command) throws Throwable {
CommandProcessorResponse ret = driver.run(command);
if (ret.getException() != null) {
throw ret.getException();
@@ -257,7 +257,7 @@ class WarehouseInstance implements Closeable {
return this;
}
- List<String> getOutput() throws IOException {
+ public List<String> getOutput() throws IOException {
List<String> results = new ArrayList<>();
driver.getResults(results);
return results;
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
new file mode 100644
index 0000000..c0751a7
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.security.auth.login.LoginException;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCleanerWithReplication extends CompactorTest {
+ private Path cmRootDirectory;
+ private static FileSystem fs;
+ private static MiniDFSCluster miniDFSCluster;
+
+ @Before
+ public void setup() throws Exception {
+ conf = new HiveConf();
+ TxnDbUtil.setConfValues(conf);
+ TxnDbUtil.cleanDb(conf);
+ conf.set("fs.defaultFS", fs.getUri().toString());
+ conf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+ ms = new HiveMetaStoreClient(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
+ cmRootDirectory = new Path(conf.get(HiveConf.ConfVars.REPLCMDIR.varname));
+ if (!fs.exists(cmRootDirectory)) {
+ fs.mkdirs(cmRootDirectory);
+ }
+ tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString());
+ }
+
+ @BeforeClass
+ public static void classLevelSetup() throws LoginException, IOException {
+ Configuration hadoopConf = new Configuration();
+ hadoopConf.set("dfs.client.use.datanode.hostname", "true");
+ hadoopConf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
+ miniDFSCluster =
+ new MiniDFSCluster.Builder(hadoopConf).numDataNodes(1).format(true).build();
+ fs = miniDFSCluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ fs.delete(cmRootDirectory, true);
+ compactorTestCleanup();
+ }
+
+ @AfterClass
+ public static void tearDownClass() {
+ miniDFSCluster.shutdown();
+ }
+
+ @Test
+ public void cleanupAfterMajorTableCompaction() throws Exception {
+ Table t = newTable("default", "camtc", false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 25L, 25);
+
+ burnThroughTransactions("default", "camtc", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ assertCleanerActions(6);
+ }
+
+ @Test
+ public void cleanupAfterMajorPartitionCompaction() throws Exception {
+ Table t = newTable("default", "campc", true);
+ Partition p = newPartition(t, "today");
+
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addBaseFile(t, p, 25L, 25);
+
+ burnThroughTransactions("default", "campc", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "campc", CompactionType.MAJOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ assertCleanerActions(6);
+ }
+
+ @Test
+ public void cleanupAfterMinorTableCompaction() throws Exception {
+ Table t = newTable("default", "camitc", false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addDeltaFile(t, null, 21L, 24L, 4);
+
+ burnThroughTransactions("default", "camitc", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camitc", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ assertCleanerActions(4);
+ }
+
+ @Test
+ public void cleanupAfterMinorPartitionCompaction() throws Exception {
+ Table t = newTable("default", "camipc", true);
+ Partition p = newPartition(t, "today");
+
+ addBaseFile(t, p, 20L, 20);
+ addDeltaFile(t, p, 21L, 22L, 2);
+ addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 21L, 24L, 4);
+
+ burnThroughTransactions("default", "camipc", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camipc", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markCompacted(ci);
+ txnHandler.setRunAs(ci.id, System.getProperty("user.name"));
+
+ assertCleanerActions(4);
+ }
+
+ private void assertCleanerActions(int expectedNumOCleanedFiles) throws Exception {
+ assertEquals("there should be no deleted files in cm root", 0,
+ fs.listStatus(cmRootDirectory).length);
+
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ String state = rsp.getCompacts().get(0).getState();
+ Assert.assertTrue("unexpected state " + state, TxnStore.SUCCEEDED_RESPONSE.equals(state));
+
+ assertEquals(
+ "there should be " + String.valueOf(expectedNumOCleanedFiles) + " deleted files in cm root",
+ expectedNumOCleanedFiles, fs.listStatus(cmRootDirectory).length
+ );
+ }
+
+ @Override
+ boolean useHive130DeltaDirName() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index af0884c..df9a5a0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A class to clean directories after compactions. This will run in a separate thread.
@@ -55,12 +57,18 @@ import java.util.concurrent.TimeUnit;
public class Cleaner extends CompactorThread {
static final private String CLASS_NAME = Cleaner.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-
private long cleanerCheckInterval = 0;
+ private ReplChangeManager replChangeManager;
// List of compactions to clean.
- private Map<Long, Set<Long>> compactId2LockMap = new HashMap<Long, Set<Long>>();
- private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<Long, CompactionInfo>();
+ private Map<Long, Set<Long>> compactId2LockMap = new HashMap<>();
+ private Map<Long, CompactionInfo> compactId2CompactInfoMap = new HashMap<>();
+
+ @Override
+ public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
+ super.init(stop, looped);
+ replChangeManager = ReplChangeManager.getInstance(conf);
+ }
@Override
public void run() {
@@ -245,10 +253,10 @@ public class Cleaner extends CompactorThread {
* Each Compaction only compacts as far as the highest txn id such that all txns below it
* are resolved (i.e. not opened). This is what "highestWriteId" tracks. This is only tracked
* since Hive 1.3.0/2.0 - thus may be 0. See ValidCompactorWriteIdList and uses for more info.
- *
+ *
* We only want to clean up to the highestWriteId - otherwise we risk deleting deltas from
* under an active reader.
- *
+ *
* Suppose we have deltas D2 D3 for table T, i.e. the last compaction created D3 so now there is a
* clean request for D2.
* Cleaner checks existing locks and finds none.
@@ -258,8 +266,9 @@ public class Cleaner extends CompactorThread {
* unless ValidTxnList is "capped" at highestWriteId.
*/
final ValidWriteIdList txnList = (ci.highestWriteId > 0)
- ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId)
- : new ValidReaderWriteIdList();
+ ? new ValidReaderWriteIdList(ci.getFullTableName(), new long[0], new BitSet(),
+ ci.highestWriteId)
+ : new ValidReaderWriteIdList();
if (runJobAsSelf(ci.runAs)) {
removeFiles(location, txnList);
@@ -306,8 +315,8 @@ public class Cleaner extends CompactorThread {
for (Path dead : filesToDelete) {
LOG.debug("Going to delete path " + dead.toString());
+ replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
fs.delete(dead, true);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 7ea017a..083c671 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.thrift.TException;
+import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,19 +93,19 @@ public abstract class CompactorTest {
protected TxnStore txnHandler;
protected IMetaStoreClient ms;
- protected long sleepTime = 1000;
protected HiveConf conf;
private final AtomicBoolean stop = new AtomicBoolean();
- private final File tmpdir;
+ protected File tmpdir;
- protected CompactorTest() throws Exception {
+ @Before
+ public void setup() throws Exception {
conf = new HiveConf();
TxnDbUtil.setConfValues(conf);
TxnDbUtil.cleanDb(conf);
ms = new HiveMetaStoreClient(conf);
txnHandler = TxnUtils.getTxnStore(conf);
- tmpdir = new File (Files.createTempDirectory("compactor_test_table_").toString());
+ tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString());
}
protected void compactorTestCleanup() throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 3ca073c..ce574b4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
@@ -54,12 +52,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class TestCleaner extends CompactorTest {
- static final private Logger LOG = LoggerFactory.getLogger(TestCleaner.class.getName());
-
- public TestCleaner() throws Exception {
- super();
- }
-
@Test
public void nothing() throws Exception {
// Test that the whole things works when there's nothing in the queue. This is just a
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
index 1cbd9bc..e2aeb9c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner2.java
@@ -21,9 +21,6 @@ package org.apache.hadoop.hive.ql.txn.compactor;
* Same as TestCleaner but tests delta file names in Hive 1.3.0 format
*/
public class TestCleaner2 extends TestCleaner {
- public TestCleaner2() throws Exception {
- super();
- }
@Override
boolean useHive130DeltaDirName() {
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index d2818db..5648393 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -41,8 +41,6 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
@@ -55,12 +53,6 @@ import java.util.concurrent.TimeUnit;
* Tests for the compactor Initiator thread.
*/
public class TestInitiator extends CompactorTest {
- static final private String CLASS_NAME = TestInitiator.class.getName();
- static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-
- public TestInitiator() throws Exception {
- super();
- }
@Test
public void nothing() throws Exception {
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 1d9c9a7..488cd90 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -65,10 +65,6 @@ public class TestWorker extends CompactorTest {
static final private String CLASS_NAME = TestWorker.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- public TestWorker() throws Exception {
- super();
- }
-
@Test
public void nothing() throws Exception {
// Test that the whole things works when there's nothing in the queue. This is just a
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
index f07c050..b6b8788 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker2.java
@@ -22,10 +22,6 @@ package org.apache.hadoop.hive.ql.txn.compactor;
*/
public class TestWorker2 extends TestWorker {
- public TestWorker2() throws Exception {
- super();
- }
-
@Override
boolean useHive130DeltaDirName() {
return true;
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 95fa0a9..7c1d5f5 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -163,104 +163,99 @@ public class ReplChangeManager {
* @param ifPurge if the file should skip Trash when move/delete source file.
* This is referred only if type is MOVE.
* @return int
- * @throws MetaException
+ * @throws IOException
*/
- int recycle(Path path, RecycleType type, boolean ifPurge) throws MetaException {
+ public int recycle(Path path, RecycleType type, boolean ifPurge) throws IOException {
if (!enabled) {
return 0;
}
- try {
- int count = 0;
-
- if (fs.isDirectory(path)) {
- FileStatus[] files = fs.listStatus(path, hiddenFileFilter);
- for (FileStatus file : files) {
- count += recycle(file.getPath(), type, ifPurge);
- }
+ int count = 0;
+ if (fs.isDirectory(path)) {
+ FileStatus[] files = fs.listStatus(path, hiddenFileFilter);
+ for (FileStatus file : files) {
+ count += recycle(file.getPath(), type, ifPurge);
+ }
+ } else {
+ String fileCheckSum = checksumFor(path, fs);
+ Path cmPath = getCMPath(conf, path.getName(), fileCheckSum);
+
+ // set timestamp before moving to cmroot, so we can
+ // avoid race condition CM remove the file before setting
+ // timestamp
+ long now = System.currentTimeMillis();
+ fs.setTimes(path, now, -1);
+
+ boolean success = false;
+ if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) {
+ // If already a file with same checksum exists in cmPath, just ignore the copy/move
+ // Also, mark the operation is unsuccessful to notify that file with same name already
+ // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by
+ // CM cleaner.
+ success = false;
} else {
- String fileCheckSum = checksumFor(path, fs);
- Path cmPath = getCMPath(conf, path.getName(), fileCheckSum);
-
- // set timestamp before moving to cmroot, so we can
- // avoid race condition CM remove the file before setting
- // timestamp
- long now = System.currentTimeMillis();
- fs.setTimes(path, now, -1);
-
- boolean success = false;
- if (fs.exists(cmPath) && fileCheckSum.equalsIgnoreCase(checksumFor(cmPath, fs))) {
- // If already a file with same checksum exists in cmPath, just ignore the copy/move
- // Also, mark the operation is unsuccessful to notify that file with same name already
- // exist which will ensure the timestamp of cmPath is updated to avoid clean-up by
- // CM cleaner.
- success = false;
- } else {
- switch (type) {
- case MOVE: {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Moving {} to {}", path.toString(), cmPath.toString());
- }
- // Rename fails if the file with same name already exist.
- success = fs.rename(path, cmPath);
- break;
- }
- case COPY: {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Copying {} to {}", path.toString(), cmPath.toString());
- }
- // It is possible to have a file with same checksum in cmPath but the content is
- // partially copied or corrupted. In this case, just overwrite the existing file with
- // new one.
- success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf);
- break;
- }
- default:
- // Operation fails as invalid input
- break;
+ switch (type) {
+ case MOVE: {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Moving {} to {}", path.toString(), cmPath.toString());
}
+ // Rename fails if the file with same name already exist.
+ success = fs.rename(path, cmPath);
+ break;
}
-
- // Ignore if a file with same content already exist in cmroot
- // We might want to setXAttr for the new location in the future
- if (success) {
- // set the file owner to hive (or the id metastore run as)
- fs.setOwner(cmPath, msUser, msGroup);
-
- // tag the original file name so we know where the file comes from
- // Note we currently only track the last known trace as
- // xattr has limited capacity. We shall revisit and store all original
- // locations if orig-loc becomes important
- try {
- fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
- } catch (UnsupportedOperationException e) {
- LOG.warn("Error setting xattr for {}", path.toString());
- }
-
- count++;
- } else {
+ case COPY: {
if (LOG.isDebugEnabled()) {
- LOG.debug("A file with the same content of {} already exists, ignore", path.toString());
+ LOG.debug("Copying {} to {}", path.toString(), cmPath.toString());
}
- // Need to extend the tenancy if we saw a newer file with the same content
- fs.setTimes(cmPath, now, -1);
+ // It is possible to have a file with same checksum in cmPath but the content is
+ // partially copied or corrupted. In this case, just overwrite the existing file with
+ // new one.
+ success = FileUtils.copy(fs, path, fs, cmPath, false, true, conf);
+ break;
}
+ default:
+ // Operation fails as invalid input
+ break;
+ }
+ }
- // Tag if we want to remain in trash after deletion.
- // If multiple files share the same content, then
- // any file claim remain in trash would be granted
- if ((type == RecycleType.MOVE) && !ifPurge) {
- try {
- fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0});
- } catch (UnsupportedOperationException e) {
- LOG.warn("Error setting xattr for {}", cmPath.toString());
- }
+ // Ignore if a file with same content already exist in cmroot
+ // We might want to setXAttr for the new location in the future
+ if (success) {
+ // set the file owner to hive (or the id metastore run as)
+ fs.setOwner(cmPath, msUser, msGroup);
+
+ // tag the original file name so we know where the file comes from
+ // Note we currently only track the last known trace as
+ // xattr has limited capacity. We shall revisit and store all original
+ // locations if orig-loc becomes important
+ try {
+ fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Error setting xattr for {}", path.toString());
+ }
+
+ count++;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("A file with the same content of {} already exists, ignore", path.toString());
+ }
+ // Need to extend the tenancy if we saw a newer file with the same content
+ fs.setTimes(cmPath, now, -1);
+ }
+
+ // Tag if we want to remain in trash after deletion.
+ // If multiple files share the same content, then
+ // any file claim remain in trash would be granted
+ if ((type == RecycleType.MOVE) && !ifPurge) {
+ try {
+ fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[] { 0 });
+ } catch (UnsupportedOperationException e) {
+ LOG.warn("Error setting xattr for {}", cmPath.toString());
}
}
- return count;
- } catch (IOException e) {
- throw new MetaException(StringUtils.stringifyException(e));
}
+ return count;
}
// Get checksum of a file
@@ -289,7 +284,7 @@ public class ReplChangeManager {
* @param checkSum checksum of the file, can be retrieved by {@link #checksumFor(Path, FileSystem)}
* @return Path
*/
- static Path getCMPath(Configuration conf, String name, String checkSum) throws IOException, MetaException {
+ static Path getCMPath(Configuration conf, String name, String checkSum) {
String newFileName = name + "_" + checkSum;
int maxLength = conf.getInt(DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_COMPONENT_LENGTH_DEFAULT);
http://git-wip-us.apache.org/repos/asf/hive/blob/53df7e88/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 445a7b8..d4a0819 100755
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -220,7 +220,11 @@ public class Warehouse {
}
void addToChangeManagement(Path file) throws MetaException {
- cm.recycle(file, RecycleType.COPY, true);
+ try {
+ cm.recycle(file, RecycleType.COPY, true);
+ } catch (IOException e) {
+ throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
}
public boolean deleteDir(Path f, boolean recursive) throws MetaException {
@@ -234,15 +238,23 @@ public class Warehouse {
public boolean deleteDir(Path f, boolean recursive, boolean ifPurge, boolean needCmRecycle) throws MetaException {
// no need to create the CM recycle file for temporary tables
if (needCmRecycle) {
- cm.recycle(f, RecycleType.MOVE, ifPurge);
+
+ try {
+ cm.recycle(f, RecycleType.MOVE, ifPurge);
+ } catch (IOException e) {
+ throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
}
FileSystem fs = getFs(f);
return fsHandler.deleteDir(fs, f, recursive, ifPurge, conf);
}
public void recycleDirToCmPath(Path f, boolean ifPurge) throws MetaException {
- cm.recycle(f, RecycleType.MOVE, ifPurge);
- return;
+ try {
+ cm.recycle(f, RecycleType.MOVE, ifPurge);
+ } catch (IOException e) {
+ throw new MetaException(org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
}
public boolean isEmpty(Path path) throws IOException, MetaException {