You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by da...@apache.org on 2017/07/05 18:03:10 UTC
hive git commit: HIVE-16893: move replication dump related work in
semantic analysis phase to execution phase using a task (Anishek Agarwal,
reviewed by Sankar Hariappan, Daniel Dai)
Repository: hive
Updated Branches:
refs/heads/master ae927155d -> c39b87958
HIVE-16893: move replication dump related work in semantic analysis phase to execution phase using a task (Anishek Agarwal, reviewed by Sankar Hariappan, Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c39b8795
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c39b8795
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c39b8795
Branch: refs/heads/master
Commit: c39b879581325e7e9e4df0b8531f80e34ab11d91
Parents: ae92715
Author: Daniel Dai <da...@hortonworks.com>
Authored: Wed Jul 5 11:02:38 2017 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Wed Jul 5 11:02:38 2017 -0700
----------------------------------------------------------------------
.../hive/ql/parse/TestReplicationScenarios.java | 3 +-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 3 +-
ql/if/queryplan.thrift | 1 +
ql/src/gen/thrift/gen-cpp/queryplan_types.cpp | 8 +-
ql/src/gen/thrift/gen-cpp/queryplan_types.h | 3 +-
.../hadoop/hive/ql/plan/api/StageType.java | 5 +-
ql/src/gen/thrift/gen-php/Types.php | 2 +
ql/src/gen/thrift/gen-py/queryplan/ttypes.py | 3 +
ql/src/gen/thrift/gen-rb/queryplan_types.rb | 5 +-
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 4 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 347 +++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplDumpWork.java | 75 ++++
.../hive/ql/parse/ExportSemanticAnalyzer.java | 4 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 305 +---------------
.../hive/ql/parse/repl/dump/TableExport.java | 32 +-
.../parse/TestReplicationSemanticAnalyzer.java | 71 ++--
16 files changed, 507 insertions(+), 364 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 0591dfd..fa68896 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFil
import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec.ReplStateMap;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -159,7 +160,7 @@ public class TestReplicationScenarios {
private static int next = 0;
private synchronized void advanceDumpDir() {
next++;
- ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+ ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
}
static class Tuple {
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index d6b97e8..1128eae 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.parse.repl.PathBuilder;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -131,7 +132,7 @@ class WarehouseInstance implements Closeable {
private void advanceDumpDir() {
next++;
- ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+ ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
}
private ArrayList<String> lastResults;
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/if/queryplan.thrift
----------------------------------------------------------------------
diff --git a/ql/if/queryplan.thrift b/ql/if/queryplan.thrift
index cc9af94..dc55805 100644
--- a/ql/if/queryplan.thrift
+++ b/ql/if/queryplan.thrift
@@ -100,6 +100,7 @@ enum StageType {
STATS,
DEPENDENCY_COLLECTION,
COLUMNSTATS,
+ REPLDUMP,
}
struct Stage {
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
index e92d776..7254d50 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.cpp
@@ -115,7 +115,8 @@ int _kStageTypeValues[] = {
StageType::MOVE,
StageType::STATS,
StageType::DEPENDENCY_COLLECTION,
- StageType::COLUMNSTATS
+ StageType::COLUMNSTATS,
+ StageType::REPLDUMP
};
const char* _kStageTypeNames[] = {
"CONDITIONAL",
@@ -129,9 +130,10 @@ const char* _kStageTypeNames[] = {
"MOVE",
"STATS",
"DEPENDENCY_COLLECTION",
- "COLUMNSTATS"
+ "COLUMNSTATS",
+ "REPLDUMP"
};
-const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(12, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
+const std::map<int, const char*> _StageType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(13, _kStageTypeValues, _kStageTypeNames), ::apache::thrift::TEnumIterator(-1, NULL, NULL));
Adjacency::~Adjacency() throw() {
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-cpp/queryplan_types.h
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-cpp/queryplan_types.h b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
index ce37b2e..38d054b 100644
--- a/ql/src/gen/thrift/gen-cpp/queryplan_types.h
+++ b/ql/src/gen/thrift/gen-cpp/queryplan_types.h
@@ -93,7 +93,8 @@ struct StageType {
MOVE = 8,
STATS = 9,
DEPENDENCY_COLLECTION = 10,
- COLUMNSTATS = 11
+ COLUMNSTATS = 11,
+ REPLDUMP = 12
};
};
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index f20174c..deca574 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -23,7 +23,8 @@ public enum StageType implements org.apache.thrift.TEnum {
MOVE(8),
STATS(9),
DEPENDENCY_COLLECTION(10),
- COLUMNSTATS(11);
+ COLUMNSTATS(11),
+ REPLDUMP(12);
private final int value;
@@ -68,6 +69,8 @@ public enum StageType implements org.apache.thrift.TEnum {
return DEPENDENCY_COLLECTION;
case 11:
return COLUMNSTATS;
+ case 12:
+ return REPLDUMP;
default:
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-php/Types.php
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-php/Types.php b/ql/src/gen/thrift/gen-php/Types.php
index e1693f3..4d902ee 100644
--- a/ql/src/gen/thrift/gen-php/Types.php
+++ b/ql/src/gen/thrift/gen-php/Types.php
@@ -114,6 +114,7 @@ final class StageType {
const STATS = 9;
const DEPENDENCY_COLLECTION = 10;
const COLUMNSTATS = 11;
+ const REPLDUMP = 12;
static public $__names = array(
0 => 'CONDITIONAL',
1 => 'COPY',
@@ -127,6 +128,7 @@ final class StageType {
9 => 'STATS',
10 => 'DEPENDENCY_COLLECTION',
11 => 'COLUMNSTATS',
+ 12 => 'REPLDUMP',
);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
index 2073959..9e29129 100644
--- a/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
+++ b/ql/src/gen/thrift/gen-py/queryplan/ttypes.py
@@ -160,6 +160,7 @@ class StageType:
STATS = 9
DEPENDENCY_COLLECTION = 10
COLUMNSTATS = 11
+ REPLDUMP = 12
_VALUES_TO_NAMES = {
0: "CONDITIONAL",
@@ -174,6 +175,7 @@ class StageType:
9: "STATS",
10: "DEPENDENCY_COLLECTION",
11: "COLUMNSTATS",
+ 12: "REPLDUMP",
}
_NAMES_TO_VALUES = {
@@ -189,6 +191,7 @@ class StageType:
"STATS": 9,
"DEPENDENCY_COLLECTION": 10,
"COLUMNSTATS": 11,
+ "REPLDUMP": 12,
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/gen/thrift/gen-rb/queryplan_types.rb
----------------------------------------------------------------------
diff --git a/ql/src/gen/thrift/gen-rb/queryplan_types.rb b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
index f8b4034..1433d4a 100644
--- a/ql/src/gen/thrift/gen-rb/queryplan_types.rb
+++ b/ql/src/gen/thrift/gen-rb/queryplan_types.rb
@@ -72,8 +72,9 @@ module StageType
STATS = 9
DEPENDENCY_COLLECTION = 10
COLUMNSTATS = 11
- VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS"}
- VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS]).freeze
+ REPLDUMP = 12
+ VALUE_MAP = {0 => "CONDITIONAL", 1 => "COPY", 2 => "DDL", 3 => "MAPRED", 4 => "EXPLAIN", 5 => "FETCH", 6 => "FUNC", 7 => "MAPREDLOCAL", 8 => "MOVE", 9 => "STATS", 10 => "DEPENDENCY_COLLECTION", 11 => "COLUMNSTATS", 12 => "REPLDUMP"}
+ VALID_VALUES = Set.new([CONDITIONAL, COPY, DDL, MAPRED, EXPLAIN, FETCH, FUNC, MAPREDLOCAL, MOVE, STATS, DEPENDENCY_COLLECTION, COLUMNSTATS, REPLDUMP]).freeze
end
class Adjacency
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index d61a460..94d6c5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -25,6 +25,8 @@ import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.index.IndexMetadataChangeTask;
@@ -108,7 +110,7 @@ public final class TaskFactory {
IndexMetadataChangeTask.class));
taskvec.add(new TaskTuple<TezWork>(TezWork.class, TezTask.class));
taskvec.add(new TaskTuple<SparkWork>(SparkWork.class, SparkTask.class));
-
+ taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
}
private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
new file mode 100644
index 0000000..f9bdff8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -0,0 +1,347 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.common.collect.Collections2;
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
+import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
+import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
+import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
+import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
+ private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
+ private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
+ private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
+
+ private final static String TMP_TABLE_PREFIX =
+ SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase();
+
+ private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
+ private Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
+
+ @Override
+ public String getName() {
+ return "REPL_DUMP";
+ }
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+ try {
+ Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), getNextDumpDir());
+ DumpMetaData dmd = new DumpMetaData(dumpRoot, conf);
+ Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
+ Long lastReplId;
+ if (work.isBootStrapDump()) {
+ lastReplId = bootStrapDump(dumpRoot, dmd, cmRoot);
+ } else {
+ lastReplId = incrementalDump(dumpRoot, dmd, cmRoot);
+ }
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
+ } catch (Exception e) {
+ LOG.error("failed", e);
+ return 1;
+ }
+ return 0;
+ }
+
+ private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
+ LOG.debug("prepareReturnValues : " + schema);
+ for (String s : values) {
+ LOG.debug(" > " + s);
+ }
+ Utils.writeOutput(values, new Path(work.resultTempPath), conf);
+ }
+
+ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
+ Long lastReplId;// get list of events matching dbPattern & tblPattern
+ // go through each event, and dump out each event to a event-level dump dir inside dumproot
+
+ // TODO : instead of simply restricting by message format, we should eventually
+ // move to a jdbc-driver-stype registering of message format, and picking message
+ // factory per event to decode. For now, however, since all messages have the
+ // same factory, restricting by message format is effectively a guard against
+ // older leftover data that would cause us problems.
+
+ work.overrideEventTo(getHive());
+
+ IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
+ new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern),
+ new EventBoundaryFilter(work.eventFrom, work.eventTo),
+ new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
+
+ EventUtils.MSClientNotificationFetcher evFetcher
+ = new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+
+ EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+ evFetcher, work.eventFrom, work.maxEventLimit(), evFilter);
+
+ lastReplId = work.eventTo;
+ String dbName = (null != work.dbNameOrPattern && !work.dbNameOrPattern.isEmpty())
+ ? work.dbNameOrPattern
+ : "?";
+ REPL_STATE_LOG
+ .info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL", dbName);
+ while (evIter.hasNext()) {
+ NotificationEvent ev = evIter.next();
+ lastReplId = ev.getEventId();
+ Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId));
+ dumpEvent(ev, evRoot, cmRoot);
+ }
+
+ REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}", dbName);
+
+ LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
+ Utils.writeOutput(
+ Arrays.asList(
+ "incremental",
+ String.valueOf(work.eventFrom),
+ String.valueOf(lastReplId)
+ ),
+ dmd.getDumpFilePath(), conf);
+ dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
+ dmd.write();
+ return lastReplId;
+ }
+
+ private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception {
+ EventHandler.Context context = new EventHandler.Context(
+ evRoot,
+ cmRoot,
+ getHive(),
+ conf,
+ getNewEventOnlyReplicationSpec(ev.getEventId())
+ );
+ EventHandlerFactory.handlerFor(ev).handle(context);
+ REPL_STATE_LOG.info(
+ "Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}",
+ String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString());
+ }
+
+ private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException {
+ return getNewReplicationSpec(eventId.toString(), eventId.toString());
+ }
+
+ private Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
+ // bootstrap case
+ Long bootDumpBeginReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
+ for (String dbName : matchesDb()) {
+ REPL_STATE_LOG
+ .info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP",
+ dbName);
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+
+ Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+ dumpFunctionMetadata(dbName, dumpRoot);
+ for (String tblName : matchesTbl(dbName, work.tableNameOrPattern)) {
+ LOG.debug(
+ "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
+ dumpTable(dbName, tblName, dbRoot);
+ }
+ }
+ Long bootDumpEndReplId = getHive().getMSC().getCurrentNotificationEventId().getEventId();
+ LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId,
+ bootDumpEndReplId);
+
+ // Now that bootstrap has dumped all objects related, we have to account for the changes
+ // that occurred while bootstrap was happening - i.e. we have to look through all events
+ // during the bootstrap period and consolidate them with our dump.
+
+ IMetaStoreClient.NotificationFilter evFilter =
+ new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern);
+ EventUtils.MSClientNotificationFetcher evFetcher =
+ new EventUtils.MSClientNotificationFetcher(getHive().getMSC());
+ EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+ evFetcher, bootDumpBeginReplId,
+ Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
+ evFilter);
+
+ // Now we consolidate all the events that happenned during the objdump into the objdump
+ while (evIter.hasNext()) {
+ NotificationEvent ev = evIter.next();
+ Path eventRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
+ // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
+ }
+ LOG.info(
+ "Consolidation done, preparing to return {},{}->{}",
+ dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+ dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
+ dmd.write();
+
+ // Set the correct last repl id to return to the user
+ return bootDumpEndReplId;
+ }
+
+ private Iterable<? extends String> matchesDb() throws HiveException {
+ if (work.dbNameOrPattern == null) {
+ return getHive().getAllDatabases();
+ } else {
+ return getHive().getDatabasesByPattern(work.dbNameOrPattern);
+ }
+ }
+
+ private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
+ throws HiveException {
+ Hive db = Hive.get();
+ if (tblPattern == null) {
+ return Collections2.filter(db.getAllTables(dbName),
+ tableName -> {
+ assert tableName != null;
+ return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX);
+ });
+ } else {
+ return db.getTablesByPattern(dbName, tblPattern);
+ }
+ }
+
+ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception {
+ Path dbRoot = new Path(dumpRoot, dbName);
+ // TODO : instantiating FS objects are generally costly. Refactor
+ FileSystem fs = dbRoot.getFileSystem(conf);
+ Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
+ HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName).database();
+ EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
+ REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata");
+ return dbRoot;
+ }
+
+ private void dumpTable(String dbName, String tblName, Path dbRoot) throws Exception {
+ try {
+ Hive db = getHive();
+ TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+ TableExport.Paths exportPaths =
+ new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf);
+ new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run();
+ REPL_STATE_LOG.info(
+ "Repl Dump: Analyzed dump for table/view: {}.{} and dumping metadata and data to path {}",
+ dbName, tblName, exportPaths.exportRootDir.toString());
+ } catch (InvalidTableException te) {
+ // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
+ // Just log a debug message and skip it.
+ LOG.debug(te.getMessage());
+ }
+ }
+
+ private ReplicationSpec getNewReplicationSpec() throws TException {
+ ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set");
+ rspec.setCurrentReplicationState(String.valueOf(getHive().getMSC()
+ .getCurrentNotificationEventId().getEventId()));
+ return rspec;
+ }
+
+ private ReplicationSpec getNewReplicationSpec(String evState, String objState) {
+ return new ReplicationSpec(true, false, evState, objState, false, true, true);
+ }
+
+ private String getNextDumpDir() {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+ // make it easy to write .q unit tests, instead of unique id generation.
+ // however, this does mean that in writing tests, we have to be aware that
+ // repl dump will clash with prior dumps, and thus have to clean up properly.
+ if (work.testInjectDumpDir == null) {
+ return "next";
+ } else {
+ return work.testInjectDumpDir;
+ }
+ } else {
+ return String.valueOf(System.currentTimeMillis());
+ // TODO: time good enough for now - we'll likely improve this.
+ // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
+ // uniqueness.
+ }
+ }
+
+ private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws Exception {
+ Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
+ List<String> functionNames = getHive().getFunctions(dbName, "*");
+ for (String functionName : functionNames) {
+ HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName);
+ if (tuple == null) {
+ continue;
+ }
+ Path functionRoot = new Path(functionsRoot, functionName);
+ Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME);
+ try (JsonWriter jsonWriter =
+ new JsonWriter(functionMetadataRoot.getFileSystem(conf), functionMetadataRoot)) {
+ FunctionSerializer serializer = new FunctionSerializer(tuple.object, conf);
+ serializer.writeTo(jsonWriter, tuple.replicationSpec);
+ }
+ REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName);
+ }
+ }
+
+ private HiveWrapper.Tuple<Function> functionTuple(String functionName, String dbName) {
+ try {
+ HiveWrapper.Tuple<Function> tuple = new HiveWrapper(getHive(), dbName).function(functionName);
+ if (tuple.object.getResourceUris().isEmpty()) {
+ REPL_STATE_LOG.warn(
+ "Not replicating function: " + functionName + " as it seems to have been created "
+ + "without USING clause");
+ return null;
+ }
+ return tuple;
+ } catch (HiveException e) {
+ //This can happen as we are querying the getFunctions before we are getting the actual function
+ //in between there can be a drop function by a user in which case our call will fail.
+ LOG.info("Function " + functionName
+ + " could not be found, we are ignoring it as it can be a valid state ", e);
+ return null;
+ }
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.REPLDUMP;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
new file mode 100644
index 0000000..1f32be9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -0,0 +1,75 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import com.google.common.primitives.Ints;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+
+@Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER,
+ Explain.Level.DEFAULT,
+ Explain.Level.EXTENDED })
+public class ReplDumpWork implements Serializable {
+ final String dbNameOrPattern, tableNameOrPattern, astRepresentationForErrorMsg, resultTempPath;
+ final Long eventFrom;
+ Long eventTo;
+ private Integer maxEventLimit;
+ static String testInjectDumpDir = null;
+
+ public static void injectNextDumpDirForTest(String dumpDir) {
+ testInjectDumpDir = dumpDir;
+ }
+
+ public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern,
+ Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit,
+ String resultTempPath) {
+ this.dbNameOrPattern = dbNameOrPattern;
+ this.tableNameOrPattern = tableNameOrPattern;
+ this.eventFrom = eventFrom;
+ this.eventTo = eventTo;
+ this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
+ this.maxEventLimit = maxEventLimit;
+ this.resultTempPath = resultTempPath;
+ }
+
+ boolean isBootStrapDump() {
+ return eventFrom == null;
+ }
+
+ int maxEventLimit() throws Exception {
+ if (eventTo < eventFrom) {
+ throw new Exception("Invalid event ID input received in TO clause");
+ }
+ Integer maxRange = Ints.checkedCast(this.eventTo - eventFrom + 1);
+ if ((maxEventLimit == null) || (maxEventLimit > maxRange)) {
+ maxEventLimit = maxRange;
+ }
+ return maxEventLimit;
+ }
+
+ void overrideEventTo(Hive fromDb) throws Exception {
+ if (eventTo == null) {
+ eventTo = fromDb.getMSC().getCurrentNotificationEventId().getEventId();
+ LoggerFactory.getLogger(this.getClass())
+ .debug("eventTo not specified, using current event id : {}", eventTo);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index fdf6c3c..0932dff 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.parse;
import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -70,7 +71,8 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
// initialize export path
String tmpPath = stripQuotes(toTree.getText());
// All parsing is done, we're now good to start the export process.
- TableExport.Paths exportPaths = new TableExport.Paths(ast, tmpPath, conf);
+ TableExport.Paths exportPaths =
+ new TableExport.Paths(ErrorMsg.INVALID_PATH.getMsg(ast), tmpPath, conf);
TableExport.AuthEntities authEntities =
new TableExport(exportPaths, ts, replicationSpec, db, conf, LOG).run();
inputs.addAll(authEntities.inputs);
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 3d0c736..48d9c94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -19,40 +19,22 @@ package org.apache.hadoop.hive.ql.parse;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
-import com.google.common.primitives.Ints;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
-import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.EventUtils;
-import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.DatabaseAndTableFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
-import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.repl.DumpType;
-import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
-import org.apache.hadoop.hive.ql.parse.repl.dump.TableExport;
import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
-import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandler;
-import org.apache.hadoop.hive.ql.parse.repl.dump.events.EventHandlerFactory;
-import org.apache.hadoop.hive.ql.parse.repl.dump.io.FunctionSerializer;
-import org.apache.hadoop.hive.ql.parse.repl.dump.io.JsonWriter;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
import org.apache.hadoop.hive.ql.parse.repl.load.EventDumpDirComparator;
import org.apache.hadoop.hive.ql.parse.repl.load.MetaData;
@@ -101,7 +83,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
private static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
- private static final String FUNCTION_METADATA_DIR_NAME = "_metadata";
private final static Logger REPL_STATE_LOG = LoggerFactory.getLogger("ReplState");
ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
@@ -180,117 +161,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
+ "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
+ String.valueOf(eventTo) + " maxEventLimit " + String.valueOf(maxEventLimit));
- String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
- Path dumpRoot = new Path(replRoot, getNextDumpDir());
- DumpMetaData dmd = new DumpMetaData(dumpRoot, conf);
- Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
- Long lastReplId;
try {
- if (eventFrom == null){
- // bootstrap case
- Long bootDumpBeginReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
- for (String dbName : matchesDb(dbNameOrPattern)) {
- REPL_STATE_LOG.info("Repl Dump: Started analyzing Repl Dump for DB: {}, Dump Type: BOOTSTRAP", dbName);
- LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
-
- Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
- dumpFunctionMetadata(dbName, dumpRoot);
- for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
- LOG.debug(
- "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
- dumpTable(ast, dbName, tblName, dbRoot);
- }
- REPL_STATE_LOG.info("Repl Dump: Completed analyzing Repl Dump for DB: {} and created {} COPY tasks to dump " +
- "metadata and data",
- dbName, rootTasks.size());
- }
- Long bootDumpEndReplId = db.getMSC().getCurrentNotificationEventId().getEventId();
- LOG.info("Bootstrap object dump phase took from {} to {}", bootDumpBeginReplId, bootDumpEndReplId);
-
- // Now that bootstrap has dumped all objects related, we have to account for the changes
- // that occurred while bootstrap was happening - i.e. we have to look through all events
- // during the bootstrap period and consolidate them with our dump.
-
- IMetaStoreClient.NotificationFilter evFilter =
- new DatabaseAndTableFilter(dbNameOrPattern, tblNameOrPattern);
- EventUtils.MSClientNotificationFetcher evFetcher =
- new EventUtils.MSClientNotificationFetcher(db.getMSC());
- EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
- evFetcher, bootDumpBeginReplId,
- Ints.checkedCast(bootDumpEndReplId - bootDumpBeginReplId) + 1,
- evFilter );
-
- // Now we consolidate all the events that happenned during the objdump into the objdump
- while (evIter.hasNext()){
- NotificationEvent ev = evIter.next();
- Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
- // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
- }
- LOG.info(
- "Consolidation done, preparing to return {},{}->{}",
- dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
- dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
- dmd.write();
-
- // Set the correct last repl id to return to the user
- lastReplId = bootDumpEndReplId;
- } else {
- // get list of events matching dbPattern & tblPattern
- // go through each event, and dump out each event to a event-level dump dir inside dumproot
- if (eventTo == null){
- eventTo = db.getMSC().getCurrentNotificationEventId().getEventId();
- LOG.debug("eventTo not specified, using current event id : {}", eventTo);
- } else if (eventTo < eventFrom) {
- throw new Exception("Invalid event ID input received in TO clause");
- }
-
- Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1);
- if ((maxEventLimit == null) || (maxEventLimit > maxRange)){
- maxEventLimit = maxRange;
- }
-
- // TODO : instead of simply restricting by message format, we should eventually
- // move to a jdbc-driver-stype registering of message format, and picking message
- // factory per event to decode. For now, however, since all messages have the
- // same factory, restricting by message format is effectively a guard against
- // older leftover data that would cause us problems.
-
- IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
- new DatabaseAndTableFilter(dbNameOrPattern, tblNameOrPattern),
- new EventBoundaryFilter(eventFrom, eventTo),
- new MessageFormatFilter(MessageFactory.getInstance().getMessageFormat()));
-
- EventUtils.MSClientNotificationFetcher evFetcher
- = new EventUtils.MSClientNotificationFetcher(db.getMSC());
-
- EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
- evFetcher, eventFrom, maxEventLimit, evFilter);
-
- lastReplId = eventTo;
- REPL_STATE_LOG.info("Repl Dump: Started Repl Dump for DB: {}, Dump Type: INCREMENTAL",
- (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
- while (evIter.hasNext()){
- NotificationEvent ev = evIter.next();
- lastReplId = ev.getEventId();
- Path evRoot = new Path(dumpRoot, String.valueOf(lastReplId));
- dumpEvent(ev, evRoot, cmRoot);
- }
-
- REPL_STATE_LOG.info("Repl Dump: Completed Repl Dump for DB: {}",
- (null != dbNameOrPattern && !dbNameOrPattern.isEmpty()) ? dbNameOrPattern : "?");
-
- LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), lastReplId);
- Utils.writeOutput(
- Arrays.asList(
- "incremental",
- String.valueOf(eventFrom),
- String.valueOf(lastReplId)
- ),
- dmd.getDumpFilePath(), conf);
- dmd.setDump(DumpType.INCREMENTAL, eventFrom, lastReplId, cmRoot);
- dmd.write();
- }
- prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(lastReplId)), dumpSchema);
+ ctx.setResFile(ctx.getLocalTmpPath());
+ Task<ReplDumpWork> replDumpWorkTask = TaskFactory
+ .get(new ReplDumpWork(
+ dbNameOrPattern,
+ tblNameOrPattern,
+ eventFrom,
+ eventTo,
+ ErrorMsg.INVALID_PATH.getMsg(ast),
+ maxEventLimit,
+ ctx.getResFile().toUri().toString()
+ ), conf);
+ rootTasks.add(replDumpWorkTask);
setFetchTask(createFetchTask(dumpSchema));
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
@@ -299,119 +182,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception {
- EventHandler.Context context = new EventHandler.Context(
- evRoot,
- cmRoot,
- db,
- conf,
- getNewEventOnlyReplicationSpec(ev.getEventId())
- );
- EventHandlerFactory.handlerFor(ev).handle(context);
- REPL_STATE_LOG.info("Repl Dump: Dumped event with ID: {}, Type: {} and dumped metadata and data to path {}",
- String.valueOf(ev.getEventId()), ev.getEventType(), evRoot.toUri().toString());
- }
-
- public static void injectNextDumpDirForTest(String dumpdir){
- testInjectDumpDir = dumpdir;
- }
-
- private String getNextDumpDir() {
- if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
- // make it easy to write .q unit tests, instead of unique id generation.
- // however, this does mean that in writing tests, we have to be aware that
- // repl dump will clash with prior dumps, and thus have to clean up properly.
- if (testInjectDumpDir == null){
- return "next";
- } else {
- return testInjectDumpDir;
- }
- } else {
- return String.valueOf(System.currentTimeMillis());
- // TODO: time good enough for now - we'll likely improve this.
- // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
- // uniqueness.
- }
- }
-
- /**
- *
- * @param dbName
- * @param dumpRoot
- * @return db dumped path
- * @throws SemanticException
- */
- private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException {
- Path dbRoot = new Path(dumpRoot, dbName);
- try {
- // TODO : instantiating FS objects are generally costly. Refactor
- FileSystem fs = dbRoot.getFileSystem(conf);
- Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
- HiveWrapper.Tuple<Database> database = new HiveWrapper(db, dbName).database();
- inputs.add(new ReadEntity(database.object));
- EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
- REPL_STATE_LOG.info("Repl Dump: Dumped DB metadata");
- } catch (Exception e) {
- // TODO : simple wrap & rethrow for now, clean up with error codes
- throw new SemanticException(e);
- }
- return dbRoot;
- }
-
- private void dumpFunctionMetadata(String dbName, Path dumpRoot) throws SemanticException {
- Path functionsRoot = new Path(new Path(dumpRoot, dbName), FUNCTIONS_ROOT_DIR_NAME);
- try {
- List<String> functionNames = db.getFunctions(dbName, "*");
- for (String functionName : functionNames) {
- HiveWrapper.Tuple<Function> tuple;
- try {
- tuple = new HiveWrapper(db, dbName).function(functionName);
- } catch (HiveException e) {
- //This can happen as we are querying the getFunctions before we are getting the actual function
- //in between there can be a drop function by a user in which case our call will fail.
- LOG.info("Function " + functionName + " could not be found, we are ignoring it as it can be a valid state ", e);
- continue;
- }
- if (tuple.object.getResourceUris().isEmpty()) {
- REPL_STATE_LOG.warn(
- "Not replicating function: " + functionName + " as it seems to have been created "
- + "without USING clause");
- continue;
- }
-
- Path functionRoot = new Path(functionsRoot, functionName);
- Path functionMetadataRoot = new Path(functionRoot, FUNCTION_METADATA_DIR_NAME);
- try (JsonWriter jsonWriter = new JsonWriter(functionMetadataRoot.getFileSystem(conf),
- functionMetadataRoot)) {
- FunctionSerializer serializer =
- new FunctionSerializer(tuple.object, conf);
- serializer.writeTo(jsonWriter, tuple.replicationSpec);
- }
- REPL_STATE_LOG.info("Repl Dump: Dumped metadata for function: {}", functionName);
- }
- } catch (Exception e) {
- throw new SemanticException(e);
- }
- }
-
- private void dumpTable(ASTNode ast, String dbName, String tblName, Path dbRoot)
- throws SemanticException {
- try {
- TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
- TableExport.Paths exportPaths = new TableExport.Paths(ast, dbRoot, tblName, conf);
- new TableExport(exportPaths, ts, getNewReplicationSpec(), db, conf, LOG).run();
- REPL_STATE_LOG.info("Repl Dump: Analyzed dump for table/view: {}.{} and created copy tasks to dump metadata " +
- "and data to path {}", dbName, tblName, exportPaths.exportRootDir.toString());
- } catch (InvalidTableException te) {
- // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
- // Just log a debug message and skip it.
- LOG.debug(te.getMessage());
- } catch (HiveException e) {
- // TODO : simple wrap & rethrow for now, clean up with error codes
- throw new SemanticException(e);
- }
- }
-
// REPL LOAD
private void initReplLoad(ASTNode ast) {
int numChildren = ast.getChildCount();
@@ -953,53 +723,4 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
ctx.setResFile(ctx.getLocalTmpPath());
Utils.writeOutput(values, ctx.getResFile(), conf);
}
-
- private ReplicationSpec getNewReplicationSpec() throws SemanticException {
- try {
- ReplicationSpec rspec = getNewReplicationSpec("replv2", "will-be-set");
- rspec.setCurrentReplicationState(String.valueOf(db.getMSC()
- .getCurrentNotificationEventId().getEventId()));
- return rspec;
- } catch (Exception e) {
- throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error codes
- }
- }
-
- // Use for specifying object state as well as event state
- private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
- return new ReplicationSpec(true, false, evState, objState, false, true, true);
- }
-
- // Use for replication states focused on event only, where the obj state will be the event state
- private ReplicationSpec getNewEventOnlyReplicationSpec(Long eventId) throws SemanticException {
- return getNewReplicationSpec(eventId.toString(), eventId.toString());
- }
-
- private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
- throws HiveException {
- if (tblPattern == null) {
- return removeValuesTemporaryTables(db.getAllTables(dbName));
- } else {
- return db.getTablesByPattern(dbName, tblPattern);
- }
- }
-
- private final static String TMP_TABLE_PREFIX =
- SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX.toLowerCase();
-
- static Iterable<String> removeValuesTemporaryTables(List<String> tableNames) {
- return Collections2.filter(tableNames,
- tableName -> {
- assert tableName != null;
- return !tableName.toLowerCase().startsWith(TMP_TABLE_PREFIX);
- });
- }
-
- private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {
- if (dbPattern == null) {
- return db.getAllDatabases();
- } else {
- return db.getDatabasesByPattern(dbPattern);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 144d667..9f22f23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
-import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -62,6 +61,9 @@ public class TableExport {
? null
: tableSpec;
this.replicationSpec = replicationSpec;
+ if (this.tableSpec != null && this.tableSpec.tableHandle.isView()) {
+ this.replicationSpec.setIsMetadataOnly(true);
+ }
this.db = db;
this.conf = conf;
this.logger = logger;
@@ -72,6 +74,7 @@ public class TableExport {
if (tableSpec == null) {
writeMetaData(null);
} else if (shouldExport()) {
+ //first we should get the correct replication spec before doing metadata/data export
if (tableSpec.tableHandle.isView()) {
replicationSpec.setIsMetadataOnly(true);
}
@@ -111,7 +114,8 @@ public class TableExport {
}
}
- private void writeMetaData(PartitionIterable partitions) throws SemanticException {
+ private void writeMetaData(PartitionIterable partitions)
+ throws SemanticException {
try {
EximUtil.createExportDump(
paths.exportFileSystem,
@@ -168,13 +172,14 @@ public class TableExport {
* directory creation.
*/
public static class Paths {
- private final ASTNode ast;
+ private final String astRepresentationForErrorMsg;
private final HiveConf conf;
public final Path exportRootDir;
private final FileSystem exportFileSystem;
- public Paths(ASTNode ast, Path dbRoot, String tblName, HiveConf conf) throws SemanticException {
- this.ast = ast;
+ public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName,
+ HiveConf conf) throws SemanticException {
+ this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
this.conf = conf;
Path tableRoot = new Path(dbRoot, tblName);
URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
@@ -190,8 +195,9 @@ public class TableExport {
}
}
- public Paths(ASTNode ast, String path, HiveConf conf) throws SemanticException {
- this.ast = ast;
+ public Paths(String astRepresentationForErrorMsg, String path, HiveConf conf)
+ throws SemanticException {
+ this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
this.conf = conf;
this.exportRootDir = new Path(EximUtil.getValidatedURI(conf, path));
try {
@@ -245,21 +251,21 @@ public class TableExport {
FileStatus tgt = fs.getFileStatus(toPath);
// target exists
if (!tgt.isDirectory()) {
- throw new SemanticException(ErrorMsg.INVALID_PATH
- .getMsg(ast, "Target is not a directory : " + rootDirExportFile));
+ throw new SemanticException(
+ astRepresentationForErrorMsg + ": " + "Target is not a directory : "
+ + rootDirExportFile);
} else {
FileStatus[] files = fs.listStatus(toPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (files != null && files.length != 0) {
throw new SemanticException(
- ErrorMsg.INVALID_PATH
- .getMsg(ast, "Target is not an empty directory : " + rootDirExportFile)
- );
+ astRepresentationForErrorMsg + ": " + "Target is not an empty directory : "
+ + rootDirExportFile);
}
}
} catch (FileNotFoundException ignored) {
}
} catch (IOException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ast), e);
+ throw new SemanticException(astRepresentationForErrorMsg, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c39b8795/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
index 1cb4470..17cf4d0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -1,33 +1,28 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
*/
package org.apache.hadoop.hive.ql.parse;
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.*;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -45,14 +40,15 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
public class TestReplicationSemanticAnalyzer {
- static QueryState queryState;
+ private static QueryState queryState;
static HiveConf conf;
- static String defaultDB = "default";
- static String tblName = "testReplSA";
- static ArrayList<String> cols = new ArrayList<String>(Arrays.asList("col1", "col2"));
- ParseDriver pd;
- SemanticAnalyzer sA;
+ private static String defaultDB = "default";
+ private static String tblName = "testReplSA";
+ private static ArrayList<String> cols = new ArrayList<String>(Arrays.asList("col1", "col2"));
@BeforeClass
public static void initialize() throws HiveException {
@@ -221,12 +217,6 @@ public class TestReplicationSemanticAnalyzer {
assertEquals(child.getChildCount(), 0);
}
- // TODO: add this test after repl dump analyze generates tasks
- //@Test
- public void testReplDumpAnalyze() throws Exception {
-
- }
-
//@Test
public void testReplLoadAnalyze() throws Exception {
ParseDriver pd = new ParseDriver();
@@ -274,19 +264,4 @@ public class TestReplicationSemanticAnalyzer {
FetchTask fetchTask = rs.getFetchTask();
assertNotNull(fetchTask);
}
-
- @Test
- public void removeTemporaryTablesForMetadataDump() {
- List<String> validTables = ImmutableList.copyOf(
- ReplicationSemanticAnalyzer.removeValuesTemporaryTables(new ArrayList<String>() {{
- add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "a");
- add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "b");
- add(SemanticAnalyzer.VALUES_TMP_TABLE_NAME_PREFIX + "c");
- add("c");
- add("b");
- add("a");
- }}));
- assertThat(validTables.size(), is(equalTo(3)));
- assertThat(validTables, hasItems("a", "b", "c"));
- }
}