You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2018/07/17 14:09:27 UTC
hive git commit: HIVE-20152: reset db state, when repl dump fails,
so rename table can be done (Anishek Agarwal, reviewed by Sankar Hariappan)
Repository: hive
Updated Branches:
refs/heads/master dceeefbdf -> c1337dfb7
HIVE-20152: reset db state, when repl dump fails, so rename table can be done (Anishek Agarwal, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c1337dfb
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c1337dfb
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c1337dfb
Branch: refs/heads/master
Commit: c1337dfb73f0df6f9cd9f9ed7257917e37b38745
Parents: dceeefb
Author: Anishek Agarwal <an...@gmail.com>
Authored: Tue Jul 17 19:39:16 2018 +0530
Committer: Anishek Agarwal <an...@gmail.com>
Committed: Tue Jul 17 19:39:16 2018 +0530
----------------------------------------------------------------------
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 52 +++++---
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 4 +-
.../hive/ql/exec/repl/ReplDumpTaskTest.java | 126 +++++++++++++++++++
3 files changed, 166 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c1337dfb/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 82ecad1..79ee80a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.log.IncrementalDumpLogger;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -216,10 +217,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return rspec;
}
- private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
+ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
// bootstrap case
Hive hiveDb = getHive();
- Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ Long bootDumpBeginReplId = currentNotificationId(hiveDb);
String validTxnList = getValidTxnListForReplDump(hiveDb);
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
@@ -231,16 +232,35 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
dumpFunctionMetadata(dbName, dumpRoot);
String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
- for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
- LOG.debug(
- "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
- dumpTable(dbName, tblName, validTxnList, dbRoot);
- dumpConstraintMetadata(dbName, tblName, dbRoot);
+ Exception caught = null;
+ try {
+ for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
+ LOG.debug(
+ "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
+ dumpTable(dbName, tblName, validTxnList, dbRoot);
+ dumpConstraintMetadata(dbName, tblName, dbRoot);
+ }
+ } catch (Exception e) {
+ caught = e;
+ } finally {
+ try {
+ Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
+ } catch (Exception e) {
+ if (caught == null) {
+ throw e;
+ } else {
+ LOG.error("failed to reset the db state for " + uniqueKey
+ + " on failure of repl dump", e);
+ throw caught;
+ }
+ }
+ if(caught != null) {
+ throw caught;
+ }
}
- Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
replLogger.endLog(bootDumpBeginReplId.toString());
}
- Long bootDumpEndReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ Long bootDumpEndReplId = currentNotificationId(hiveDb);
LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId,
bootDumpEndReplId);
@@ -274,7 +294,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return bootDumpBeginReplId;
}
- private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception {
+ long currentNotificationId(Hive hiveDb) throws TException {
+ return hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ }
+
+ Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception {
Path dbRoot = new Path(dumpRoot, dbName);
// TODO : instantiating FS objects are generally costly. Refactor
FileSystem fs = dbRoot.getFileSystem(conf);
@@ -284,7 +308,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return dbRoot;
}
- private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception {
+ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception {
try {
Hive db = getHive();
HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName);
@@ -331,7 +355,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return openTxns;
}
- private String getValidTxnListForReplDump(Hive hiveDb) throws HiveException {
+ String getValidTxnListForReplDump(Hive hiveDb) throws HiveException {
// Key design point for REPL DUMP is to not have any txns older than current txn in which dump runs.
// This is needed to ensure that Repl dump doesn't copy any data files written by any open txns
// mainly for streaming ingest case where one delta file shall have data from committed/aborted/open txns.
@@ -396,7 +420,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
- private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception {
+ void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception {
Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
List<String> functionNames = getHive().getFunctions(dbName, "*");
for (String functionName : functionNames) {
@@ -415,7 +439,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
}
}
- private void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception {
+ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) throws Exception {
try {
Path constraintsRoot = new Path(dbRoot, CONSTRAINTS_ROOT_DIR_NAME);
Path commonConstraintsFile = new Path(constraintsRoot, ConstraintFileType.COMMON.getPrefix() + tblName);
http://git-wip-us.apache.org/repos/asf/hive/blob/c1337dfb/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 62d699f..59ffb90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -75,7 +75,7 @@ public class Utils {
}
}
- public static Iterable<? extends String> matchesDb(Hive db, String dbPattern) throws HiveException {
+ public static Iterable<String> matchesDb(Hive db, String dbPattern) throws HiveException {
if (dbPattern == null) {
return db.getAllDatabases();
} else {
@@ -83,7 +83,7 @@ public class Utils {
}
}
- public static Iterable<? extends String> matchesTbl(Hive db, String dbName, String tblPattern)
+ public static Iterable<String> matchesTbl(Hive db, String dbName, String tblPattern)
throws HiveException {
if (tblPattern == null) {
return getAllTables(db, dbName);
http://git-wip-us.apache.org/repos/asf/hive/blob/c1337dfb/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
new file mode 100644
index 0000000..7bd035e
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
@@ -0,0 +1,126 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.powermock.api.mockito.PowerMockito.verifyStatic;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ Utils.class })
+@PowerMockIgnore({ "javax.management.*" })
+public class ReplDumpTaskTest {
+
+ @Mock
+ private Hive hive;
+
+ class StubReplDumpTask extends ReplDumpTask {
+
+ @Override
+ protected Hive getHive() {
+ return hive;
+ }
+
+ @Override
+ long currentNotificationId(Hive hiveDb) {
+ return Long.MAX_VALUE;
+ }
+
+ @Override
+ String getValidTxnListForReplDump(Hive hiveDb) {
+ return "";
+ }
+
+ @Override
+ void dumpFunctionMetadata(String dbName, Path dumpRoot) {
+ }
+
+ @Override
+ Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) {
+ return Mockito.mock(Path.class);
+ }
+
+ @Override
+ void dumpConstraintMetadata(String dbName, String tblName, Path dbRoot) {
+ }
+ }
+
+ private static class TestException extends Exception {
+ }
+
+ @Test(expected = TestException.class)
+ public void removeDBPropertyToPreventRenameWhenBootstrapDumpOfTableFails() throws Exception {
+ List<String> tableList = Arrays.asList("a1", "a2");
+ String dbRandomKey = "akeytoberandom";
+
+ mockStatic(Utils.class);
+ when(Utils.matchesDb(same(hive), eq("default")))
+ .thenReturn(Collections.singletonList("default"));
+ when(Utils.getAllTables(same(hive), eq("default"))).thenReturn(tableList);
+ when(Utils.setDbBootstrapDumpState(same(hive), eq("default"))).thenReturn(dbRandomKey);
+ when(Utils.matchesTbl(same(hive), eq("default"), anyString())).thenReturn(tableList);
+
+
+ when(hive.getAllFunctions()).thenReturn(Collections.emptyList());
+
+ ReplDumpTask task = new StubReplDumpTask() {
+ private int tableDumpCount = 0;
+
+ @Override
+ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot)
+ throws Exception {
+ tableDumpCount++;
+ if (tableDumpCount > 1) {
+ throw new TestException();
+ }
+ }
+ };
+
+ task.setWork(
+ new ReplDumpWork("default", "",
+ Long.MAX_VALUE, Long.MAX_VALUE, "",
+ Integer.MAX_VALUE, "")
+ );
+
+ try {
+ task.bootStrapDump(mock(Path.class), null, mock(Path.class));
+ } finally {
+ verifyStatic();
+ Utils.resetDbBootstrapDumpState(same(hive), eq("default"), eq(dbRandomKey));
+ }
+ }
+}