You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/11/15 20:21:03 UTC
[31/50] [abbrv] hive git commit: HIVE-15151: Bootstrap support for
replv2 (Sushanth Sowmyan reviewed by Vaibhav Gumashta)
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..a4dfa3a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
+
+public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
+ // Database name or pattern
+ private String dbNameOrPattern;
+ // Table name or pattern
+ private String tblNameOrPattern;
+ private Integer eventFrom;
+ private Integer eventTo;
+ private Integer batchSize;
+ // Base path for REPL LOAD
+ private String path;
+
+ public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ public void analyzeInternal(ASTNode ast) throws SemanticException {
+ LOG.debug("ReplicationSemanticAanalyzer: analyzeInternal");
+ LOG.debug(ast.getName() + ":" + ast.getToken().getText() + "=" + ast.getText());
+ switch (ast.getToken().getType()) {
+ case TOK_REPL_DUMP: {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
+ initReplDump(ast);
+ analyzeReplDump(ast);
+ }
+ case TOK_REPL_LOAD: {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load");
+ initReplLoad(ast);
+ analyzeReplLoad(ast);
+ }
+ case TOK_REPL_STATUS: {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status");
+ initReplStatus(ast);
+ analyzeReplStatus(ast);
+ }
+ default: {
+ throw new SemanticException("Unexpected root token");
+ }
+ }
+ }
+
+ private void initReplDump(ASTNode ast) {
+ int numChildren = ast.getChildCount();
+ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+ // skip the first node, which is always required
+ int currNode = 1;
+ while (currNode < numChildren) {
+ if (ast.getChild(currNode).getType() != TOK_FROM) {
+ // optional tblName was specified.
+ tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getText());
+ } else {
+ // TOK_FROM subtree
+ Tree fromNode = ast.getChild(currNode);
+ eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
+ // skip the first, which is always required
+ int numChild = 1;
+ while (numChild < fromNode.getChildCount()) {
+ if (fromNode.getChild(numChild).getType() == TOK_TO) {
+ eventTo =
+ Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+ // skip the next child, since we already took care of it
+ numChild++;
+ } else if (fromNode.getChild(numChild).getType() == TOK_BATCH) {
+ batchSize =
+ Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+ // skip the next child, since we already took care of it
+ numChild++;
+ }
+ // move to the next child in FROM tree
+ numChild++;
+ }
+ // FROM node is always the last
+ break;
+ }
+ // move to the next root node
+ currNode++;
+ }
+ }
+
+ // REPL DUMP
+ private void analyzeReplDump(ASTNode ast) throws SemanticException {
+ // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize
+ LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
+ + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
+ + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
+ String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+ Path dumpRoot = new Path(replRoot, getNextDumpDir());
+ try {
+ for (String dbName : matchesDb(dbNameOrPattern)) {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+ Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+ for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
+ + " to db root " + dbRoot.toUri());
+ dumpTbl(ast, dbName, tblName, dbRoot);
+ }
+ }
+ String currentReplId =
+ String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId),
+ "dump_dir,last_repl_id#string,string");
+ } catch (Exception e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+ }
+
+ String getNextDumpDir() {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+ return "next";
+ // make it easy to write unit tests, instead of unique id generation.
+ // however, this does mean that in writing tests, we have to be aware that
+ // repl dump will clash with prior dumps, and thus have to clean up properly.
+ } else {
+ return String.valueOf(System.currentTimeMillis());
+ // TODO: time good enough for now - we'll likely improve this.
+ // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
+ // uniqueness.
+ }
+ }
+
+ /**
+ *
+ * @param dbName
+ * @param dumpRoot
+ * @return db dumped path
+ * @throws SemanticException
+ */
+ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException {
+ Path dbRoot = new Path(dumpRoot, dbName);
+ try {
+ // TODO : instantiating FS objects are generally costly. Refactor
+ FileSystem fs = dbRoot.getFileSystem(conf);
+ Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
+ Database dbObj = db.getDatabase(dbName);
+ EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec());
+ } catch (Exception e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+ return dbRoot;
+ }
+
+ /**
+ *
+ * @param ast
+ * @param dbName
+ * @param tblName
+ * @param dbRoot
+ * @return tbl dumped path
+ * @throws SemanticException
+ */
+ private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) throws SemanticException {
+ Path tableRoot = new Path(dbRoot, tblName);
+ try {
+ URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
+ TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+ ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx,
+ rootTasks, inputs, outputs, LOG);
+ } catch (HiveException e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+ return tableRoot;
+ }
+
+ // REPL LOAD
+ private void initReplLoad(ASTNode ast) {
+ int numChildren = ast.getChildCount();
+ path = PlanUtils.stripQuotes(ast.getChild(0).getText());
+ if (numChildren > 1) {
+ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+ }
+ if (numChildren > 2) {
+ tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(2).getText());
+ }
+ }
+
+ /*
+ * Example dump dirs we need to be able to handle :
+ *
+ * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/<dumpdir>
+ *
+ * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/
+ * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files
+ *
+ * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ...
+ *
+ * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files
+ */
+ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
+ LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "."
+ + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path));
+
+ // for analyze repl load, we walk through the dir structure available in the path,
+ // looking at each db, and then each table, and then setting up the appropriate
+ // import job in its place.
+
+ // FIXME : handle non-bootstrap cases.
+
+ // We look at the path, and go through each subdir.
+ // Each subdir corresponds to a database.
+ // For each subdir, there is a _metadata file which allows us to re-impress the db object
+ // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend
+ // that we had an IMPORT on each of them, into this db.
+
+ try {
+
+ Path loadPath = new Path(path);
+ final FileSystem fs = loadPath.getFileSystem(conf);
+
+ if (!fs.exists(loadPath)) {
+ // supposed dump path does not exist.
+ throw new FileNotFoundException(loadPath.toUri().toString());
+ }
+
+ // Now, the dumped path can be one of two things:
+ // a) It can be a db dump, in which case we expect a set of dirs, each with a
+ // db name, and with a _metadata file in each, and table dirs inside that.
+ // b) It can be a table dump dir, in which case we expect a _metadata dump of
+ // a table in question in the dir, and individual ptn dir hierarchy.
+ // Once we expand this into doing incremental repl, we can have individual events which can
+ // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed
+ // that this is a tbl-level dump, and it is an error condition if we find anything else. Also,
+ // if dbname is specified, we expect exactly one db dumped, and having more is an error
+ // condition.
+
+ if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+ analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
+ return;
+ }
+
+ FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath);
+ if (srcs == null || (srcs.length == 0)) {
+ throw new FileNotFoundException(loadPath.toUri().toString());
+ }
+
+ FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
+
+ if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) {
+ throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
+ }
+
+ if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
+ LOG.debug("Found multiple dirs when we expected 1:");
+ for (FileStatus d : dirsInLoadPath) {
+ LOG.debug("> " + d.getPath().toUri().toString());
+ }
+ throw new IllegalArgumentException(
+ "Multiple dirs in "
+ + loadPath.toUri().toString()
+ + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
+ }
+
+ for (FileStatus dir : dirsInLoadPath) {
+ analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+ }
+
+ } catch (Exception e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+
+ }
+
+ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
+ throws SemanticException {
+ try {
+ // Path being passed to us is a db dump location. We go ahead and load as needed.
+ // dbName might be null or empty, in which case we keep the original db name for the new
+ // database creation
+
+ // Two steps here - first, we read the _metadata file here, and create a CreateDatabaseDesc
+ // associated with that
+ // Then, we iterate over all subdirs, and create table imports for each.
+
+ EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+ try {
+ rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME));
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ }
+
+ Database dbObj = rv.getDatabase();
+
+ if (dbObj == null) {
+ throw new IllegalArgumentException(
+ "_metadata file read did not contain a db object - invalid dump.");
+ }
+
+ if ((dbName == null) || (dbName.isEmpty())) {
+ // We use dbName specified as long as it is not null/empty. If so, then we use the original
+ // name
+ // recorded in the thrift object.
+ dbName = dbObj.getName();
+ }
+
+ CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
+ createDbDesc.setName(dbName);
+ createDbDesc.setComment(dbObj.getDescription());
+ createDbDesc.setDatabaseProperties(dbObj.getParameters());
+ // note that we do not set location - for repl load, we want that auto-created.
+
+ createDbDesc.setIfNotExists(false);
+ // If it exists, we want this to be an error condition. Repl Load is not intended to replace a
+ // db.
+ // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on.
+ Task<? extends Serializable> createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf);
+ rootTasks.add(createDbTask);
+
+ FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
+
+ for (FileStatus tableDir : dirsInDbPath) {
+ analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
+ }
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ private void analyzeTableLoad(String dbName, String tblName, String locn,
+ Task<? extends Serializable> precursor) throws SemanticException {
+ // Path being passed to us is a table dump location. We go ahead and load it in as needed.
+ // If tblName is null, then we default to the table name specified in _metadata, which is good.
+ // or are both specified, in which case, that's what we are intended to create the new table as.
+ if (dbName == null || dbName.isEmpty()) {
+ throw new SemanticException("Database name cannot be null for a table load");
+ }
+ try {
+ // no location set on repl loads
+ boolean isLocationSet = false;
+ // all repl imports are non-external
+ boolean isExternalSet = false;
+ // bootstrap loads are not partition level
+ boolean isPartSpecSet = false;
+ // repl loads are not partition level
+ LinkedHashMap<String, String> parsedPartSpec = null;
+ // no location for repl imports
+ String parsedLocation = null;
+ boolean waitOnCreateDb = false;
+ List<Task<? extends Serializable>> importTasks = null;
+ if (precursor == null) {
+ importTasks = rootTasks;
+ waitOnCreateDb = false;
+ } else {
+ importTasks = new ArrayList<Task<? extends Serializable>>();
+ waitOnCreateDb = true;
+ }
+ EximUtil.SemanticAnalyzerWrapperContext x =
+ new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG,
+ ctx);
+ ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
+ waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
+
+ if (precursor != null) {
+ for (Task<? extends Serializable> t : importTasks) {
+ precursor.addDependentTask(t);
+ }
+ }
+
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ // REPL STATUS
+ private void initReplStatus(ASTNode ast) {
+ int numChildren = ast.getChildCount();
+ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+ if (numChildren > 1) {
+ tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+ }
+ }
+
+ private void analyzeReplStatus(ASTNode ast) throws SemanticException {
+ LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern)
+ + "." + String.valueOf(tblNameOrPattern));
+
+ String replLastId = null;
+
+ try {
+ if (tblNameOrPattern != null) {
+ // Checking for status of table
+ Table tbl = db.getTable(dbNameOrPattern, tblNameOrPattern);
+ if (tbl != null) {
+ inputs.add(new ReadEntity(tbl));
+ Map<String, String> params = tbl.getParameters();
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+ replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+ }
+ }
+ } else {
+ // Checking for status of a db
+ Database database = db.getDatabase(dbNameOrPattern);
+ if (database != null) {
+ inputs.add(new ReadEntity(database));
+ Map<String, String> params = database.getParameters();
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+ replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+ }
+ }
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+ // codes
+ }
+
+ LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to "
+ + ctx.getResFile());
+ prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+ }
+
+ private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
+ LOG.debug("prepareReturnValues : " + schema);
+ for (String s : values) {
+ LOG.debug(" > " + s);
+ }
+
+ ctx.setResFile(ctx.getLocalTmpPath());
+ // FIXME : this should not accessible by the user if we write to it from the frontend.
+ // Thus, we should Desc/Work this, otherwise there is a security issue here.
+ // Note: if we don't call ctx.setResFile, we get a NPE from the following code section
+ // If we do call it, then FetchWork thinks that the "table" here winds up thinking that
+ // this is a partitioned dir, which does not work. Thus, this does not work.
+
+ writeOutput(values);
+ }
+
+ private void writeOutput(List<String> values) throws SemanticException {
+ Path outputFile = ctx.getResFile();
+ FileSystem fs = null;
+ DataOutputStream outStream = null;
+ try {
+ fs = outputFile.getFileSystem(conf);
+ outStream = fs.create(outputFile);
+ outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
+ for (int i = 1; i < values.size(); i++) {
+ outStream.write(Utilities.ctrlaCode);
+ outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1)));
+ }
+ outStream.write(Utilities.newLineCode);
+ } catch (IOException e) {
+ throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+ // codes
+ } finally {
+ IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask -
+ // replace with this
+ }
+ }
+
+ private ReplicationSpec getNewReplicationSpec() throws SemanticException {
+ try {
+ ReplicationSpec replicationSpec =
+ new ReplicationSpec(true, false, "replv2", "will-be-set", false, true);
+ replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC()
+ .getCurrentNotificationEventId().getEventId()));
+ return replicationSpec;
+ } catch (Exception e) {
+ throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+ // codes
+ }
+ }
+
+ private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
+ throws HiveException {
+ if (tblPattern == null) {
+ return db.getAllTables(dbName);
+ } else {
+ return db.getTablesByPattern(dbName, tblPattern);
+ }
+ }
+
+ private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {
+ if (dbPattern == null) {
+ return db.getAllDatabases();
+ } else {
+ return db.getDatabasesByPattern(dbPattern);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 4668271..824cf11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -42,6 +42,7 @@ public class ReplicationSpec {
private String eventId = null;
private String currStateId = null;
private boolean isNoop = false;
+ private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
// Key definitions related to replication
@@ -49,8 +50,9 @@ public class ReplicationSpec {
REPL_SCOPE("repl.scope"),
EVENT_ID("repl.event.id"),
CURR_STATE_ID("repl.last.id"),
- NOOP("repl.noop");
-
+ NOOP("repl.noop"),
+ LAZY("repl.lazy"),
+ ;
private final String keyName;
KEY(String s) {
@@ -102,32 +104,32 @@ public class ReplicationSpec {
this((ASTNode)null);
}
- public ReplicationSpec(
- boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState,
- String currentReplicationState, boolean isNoop){
+ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
+ String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy) {
this.isInReplicationScope = isInReplicationScope;
this.isMetadataOnly = isMetadataOnly;
this.eventId = eventReplicationState;
this.currStateId = currentReplicationState;
this.isNoop = isNoop;
+ this.isLazy = isLazy;
}
public ReplicationSpec(Function<String, String> keyFetcher) {
String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString());
this.isMetadataOnly = false;
this.isInReplicationScope = false;
- if (scope != null){
- if (scope.equalsIgnoreCase("metadata")){
+ if (scope != null) {
+ if (scope.equalsIgnoreCase("metadata")) {
this.isMetadataOnly = true;
this.isInReplicationScope = true;
- } else if (scope.equalsIgnoreCase("all")){
+ } else if (scope.equalsIgnoreCase("all")) {
this.isInReplicationScope = true;
}
}
this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString());
this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
- this.isNoop = Boolean.parseBoolean(
- keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+ this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+ this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
}
/**
@@ -280,6 +282,21 @@ public class ReplicationSpec {
this.isNoop = isNoop;
}
+ /**
+ * @return whether or not the current replication action is set to be lazy
+ */
+ public boolean isLazy() {
+ return isLazy;
+ }
+
+ /**
+ * @param isLazy whether or not the current replication action should be lazy
+ */
+ public void setLazy(boolean isLazy){
+ this.isLazy = isLazy;
+ }
+
+
public String get(KEY key) {
switch (key){
case REPL_SCOPE:
@@ -297,6 +314,8 @@ public class ReplicationSpec {
return getCurrentReplicationState();
case NOOP:
return String.valueOf(isNoop());
+ case LAZY:
+ return String.valueOf(isLazy());
}
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index ed01a31..520d3de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -128,6 +128,10 @@ public final class SemanticAnalyzerFactory {
commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT);
commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK);
commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT);
+ commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now
+ commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now
+ commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE
+
}
static {
@@ -185,6 +189,12 @@ public final class SemanticAnalyzerFactory {
return new ExportSemanticAnalyzer(queryState);
case HiveParser.TOK_IMPORT:
return new ImportSemanticAnalyzer(queryState);
+ case HiveParser.TOK_REPL_DUMP:
+ return new ReplicationSemanticAnalyzer(queryState);
+ case HiveParser.TOK_REPL_LOAD:
+ return new ReplicationSemanticAnalyzer(queryState);
+ case HiveParser.TOK_REPL_STATUS:
+ return new ReplicationSemanticAnalyzer(queryState);
case HiveParser.TOK_ALTERTABLE: {
Tree child = tree.getChild(1);
switch (child.getType()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
new file mode 100644
index 0000000..1932d60
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+/**
+ * Marker work for Replication - behaves similar to CopyWork, but maps to ReplCopyTask,
+ * which will have mechanics to list the files in source to write to the destination,
+ * instead of copying them, if specified, falling back to copying if needed.
+ */
+@Explain(displayName = "Copy for Replication", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class ReplCopyWork extends CopyWork {
+
+ protected boolean copyFiles = true; // governs copy-or-list-files behaviour.
+ // If set to true, behaves identically to a CopyWork
+ // If set to false, ReplCopyTask does a file-list of the things to be copied instead, and puts them in a file called _files.
+ // Default is set to mimic CopyTask, with the intent that any Replication code will explicitly flip this.
+
+ /**
+ * TODO : Refactor
+ *
+ * There is an upcoming patch that refactors this bit of code. Currently, the idea is the following:
+ *
+ * By default, ReplCopyWork will behave similarly to CopyWork, and simply copy
+ * along data from the source to destination. If, however, listFilesOnOutput is set,
+ * then, instead of copying the individual files to the destination, it simply creates
+ * a file called _files on destination that contains the list of the original files
+ * that were intended to be copied. Thus, we do not actually copy the files at CopyWork
+ * time.
+ *
+ * The flip side of this behaviour happens when, instead, readListFromInput is set. This
+ * flag, if set, changes the source behaviour of this CopyTask, and instead of copying
+ * explicit files, this will then fall back to a behaviour wherein an _files is read from
+ * the source, and the files specified by the _files are then copied to the destination.
+ *
+ * This allows us a lazy-copy-on-source and a pull-from destination semantic that we want
+ * to use from replication.
+ *
+ * ==
+ *
+ * The refactor intent, however, is to simplify this, so that we have only 1 flag that we set,
+ * called isLazy. If isLazy is set, then this is the equivalent of the current listFilesOnOutput,
+ * and will generate a _files file.
+ *
+ * As to the input, we simply decide on whether to use the lazy mode or not depending on the
+ * presence of a _files file on the input. If we see a _files on the input, we simply expand it
+ * to copy as needed. If we do not, we copy as normal.
+ *
+ */
+
+ protected boolean listFilesOnOutput = false; // governs copy-or-list-files behaviour
+ // If set to true, it'll iterate over input files, and for each file in the input,
+ // it'll write out an additional line in a _files file in the output.
+ // If set to false, it'll behave as a traditional CopyTask.
+
+ protected boolean readListFromInput = false; // governs remote-fetch-input behaviour
+ // If set to true, we'll assume that the input has a _files file present which lists
+ // the actual input files to copy, and we'll pull each of those on read.
+ // If set to false, it'll behave as a traditional CopyTask.
+
+ public ReplCopyWork() {
+ }
+
+ public ReplCopyWork(final Path fromPath, final Path toPath) {
+ super(fromPath, toPath, true);
+ }
+
+ public ReplCopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
+ super(fromPath, toPath, errorOnSrcEmpty);
+ }
+
+ public void setListFilesOnOutputBehaviour(boolean listFilesOnOutput){
+ this.listFilesOnOutput = listFilesOnOutput;
+ }
+
+ public boolean getListFilesOnOutputBehaviour(){
+ return this.listFilesOnOutput;
+ }
+
+ public void setReadListFromInput(boolean readListFromInput){
+ this.readListFromInput = readListFromInput;
+ }
+
+ public boolean getReadListFromInput(){
+ return this.readListFromInput;
+ }
+
+ // specialization of getListFilesOnOutputBehaviour, with a filestatus arg
+ // we can default to the default getListFilesOnOutputBehaviour behaviour,
+ // or, we can do additional pattern matching to decide that certain files
+ // should not be listed, and copied instead, _metadata files, for instance.
+ // Currently, we use this to skip _metadata files, but we might decide that
+ // this is not the right place for it later on.
+ public boolean getListFilesOnOutputBehaviour(FileStatus f) {
+ if (f.getPath().toString().contains("_metadata")){
+ return false; // always copy _metadata files
+ }
+ return this.listFilesOnOutput;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..54b2bd1
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.*;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestReplicationSemanticAnalyzer {
+ static QueryState queryState;
+ static HiveConf conf;
+ static String defaultDB = "default";
+ static String tblName = "testReplSA";
+ static ArrayList<String> cols = new ArrayList<String>(Arrays.asList("col1", "col2"));
+ ParseDriver pd;
+ SemanticAnalyzer sA;
+
+
+ @BeforeClass
+ public static void initialize() throws HiveException {
+ queryState = new QueryState(new HiveConf(SemanticAnalyzer.class));
+ conf = queryState.getConf();
+ conf.set("hive.security.authorization.manager", "");
+ SessionState.start(conf);
+ Hive hiveDb = Hive.get(conf);
+ hiveDb.createTable(defaultDB + "." + tblName, cols, null, OrcInputFormat.class, OrcOutputFormat.class);
+ Table t = hiveDb.getTable(tblName);
+ }
+
+ @AfterClass
+ public static void teardown() throws HiveException {
+ }
+
+ @Test
+ public void testReplDumpParse() throws Exception {
+ ParseDriver pd = new ParseDriver();
+ String fromEventId = "100";
+ String toEventId = "200";
+ String batchSize = "50";
+ ASTNode root;
+ ASTNode child;
+
+ String query = "repl dump " + defaultDB;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getText(), "TOK_REPL_DUMP");
+ assertEquals(root.getChildCount(), 1);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl dump " + defaultDB + "." + tblName;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 2);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ root = (ASTNode) root.getChild(2);
+ assertEquals(root.getText(), "TOK_FROM");
+ assertEquals(root.getChildCount(), 1);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), fromEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId;
+
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ root = (ASTNode) root.getChild(2);
+ assertEquals(root.getText(), "TOK_FROM");
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), fromEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), "TOK_TO");
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(2);
+ assertEquals(child.getText(), toEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ query =
+ "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId
+ + " batch " + batchSize;
+
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ root = (ASTNode) root.getChild(2);
+ assertEquals(root.getText(), "TOK_FROM");
+ assertEquals(root.getChildCount(), 5);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), fromEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), "TOK_TO");
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(2);
+ assertEquals(child.getText(), toEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(3);
+ assertEquals(child.getText(), "TOK_BATCH");
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(4);
+ assertEquals(child.getText(), batchSize);
+ assertEquals(child.getChildCount(), 0);
+ }
+
+ @Test
+ public void testReplLoadParse() throws Exception {
+ // FileSystem fs = FileSystem.get(conf);
+ ParseDriver pd = new ParseDriver();
+ ASTNode root;
+ ASTNode child;
+ String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+ Path dumpRoot = new Path(replRoot, "next");
+ System.out.println(replRoot);
+ System.out.println(dumpRoot);
+ String newDB = "default_bak";
+
+ String query = "repl load from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getText(), "TOK_REPL_LOAD");
+ assertEquals(root.getChildCount(), 1);
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getText(), "TOK_REPL_LOAD");
+ assertEquals(root.getChildCount(), 2);
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+ assertEquals(child.getChildCount(), 0);
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), newDB);
+ assertEquals(child.getChildCount(), 0);
+ }
+
+ // TODO: add this test after repl dump analyze generates tasks
+ //@Test
+ public void testReplDumpAnalyze() throws Exception {
+
+ }
+
+ //@Test
+ public void testReplLoadAnalyze() throws Exception {
+ ParseDriver pd = new ParseDriver();
+ ASTNode root;
+ String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+ FileSystem fs = FileSystem.get(conf);
+ Path dumpRoot = new Path(replRoot, "next");
+ System.out.println(replRoot);
+ System.out.println(dumpRoot);
+ String newDB = "default_bak";
+
+ // First create a dump
+ String query = "repl dump " + defaultDB;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ ReplicationSemanticAnalyzer rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+ rs.analyze(root, new Context(conf));
+
+ // Then analyze load
+ query = "repl load from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+ rs.analyze(root, new Context(conf));
+ List<Task<? extends Serializable>> roots = rs.getRootTasks();
+ assertEquals(1, roots.size());
+
+ query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+ rs.analyze(root, new Context(conf));
+ roots = rs.getRootTasks();
+ assertEquals(1, roots.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
index 03515ea..0caa42a 100644
--- a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
+++ b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
@@ -19,4 +19,4 @@ POSTHOOK: type: LOAD
#### A masked pattern was here ####
POSTHOOK: Output: default@exim_department
#### A masked pattern was here ####
-FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile
+FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile,file