You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/11/14 04:15:08 UTC
[1/2] hive git commit: HIVE-15151: Bootstrap support for replv2
(Sushanth Sowmyan reviewed by Vaibhav Gumashta)
Repository: hive
Updated Branches:
refs/heads/master 532e6a72e -> 739ac3af3
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..a4dfa3a
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -0,0 +1,533 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
+import org.apache.hadoop.hive.ql.plan.DDLWork;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.io.IOUtils;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hive.ql.parse.HiveParser.*;
+
+public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
+ // Database name or pattern
+ private String dbNameOrPattern;
+ // Table name or pattern
+ private String tblNameOrPattern;
+ private Integer eventFrom;
+ private Integer eventTo;
+ private Integer batchSize;
+ // Base path for REPL LOAD
+ private String path;
+
+ public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ public void analyzeInternal(ASTNode ast) throws SemanticException {
+ LOG.debug("ReplicationSemanticAanalyzer: analyzeInternal");
+ LOG.debug(ast.getName() + ":" + ast.getToken().getText() + "=" + ast.getText());
+ switch (ast.getToken().getType()) {
+ case TOK_REPL_DUMP: {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: dump");
+ initReplDump(ast);
+ analyzeReplDump(ast);
+ }
+ case TOK_REPL_LOAD: {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: load");
+ initReplLoad(ast);
+ analyzeReplLoad(ast);
+ }
+ case TOK_REPL_STATUS: {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeInternal: status");
+ initReplStatus(ast);
+ analyzeReplStatus(ast);
+ }
+ default: {
+ throw new SemanticException("Unexpected root token");
+ }
+ }
+ }
+
+ private void initReplDump(ASTNode ast) {
+ int numChildren = ast.getChildCount();
+ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+ // skip the first node, which is always required
+ int currNode = 1;
+ while (currNode < numChildren) {
+ if (ast.getChild(currNode).getType() != TOK_FROM) {
+ // optional tblName was specified.
+ tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(currNode).getText());
+ } else {
+ // TOK_FROM subtree
+ Tree fromNode = ast.getChild(currNode);
+ eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
+ // skip the first, which is always required
+ int numChild = 1;
+ while (numChild < fromNode.getChildCount()) {
+ if (fromNode.getChild(numChild).getType() == TOK_TO) {
+ eventTo =
+ Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+ // skip the next child, since we already took care of it
+ numChild++;
+ } else if (fromNode.getChild(numChild).getType() == TOK_BATCH) {
+ batchSize =
+ Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+ // skip the next child, since we already took care of it
+ numChild++;
+ }
+ // move to the next child in FROM tree
+ numChild++;
+ }
+ // FROM node is always the last
+ break;
+ }
+ // move to the next root node
+ currNode++;
+ }
+ }
+
+ // REPL DUMP
+ private void analyzeReplDump(ASTNode ast) throws SemanticException {
+ // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize
+ LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
+ + "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
+ + String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
+ String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+ Path dumpRoot = new Path(replRoot, getNextDumpDir());
+ try {
+ for (String dbName : matchesDb(dbNameOrPattern)) {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+ Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+ for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
+ + " to db root " + dbRoot.toUri());
+ dumpTbl(ast, dbName, tblName, dbRoot);
+ }
+ }
+ String currentReplId =
+ String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId),
+ "dump_dir,last_repl_id#string,string");
+ } catch (Exception e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+ }
+
+ String getNextDumpDir() {
+ if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+ return "next";
+ // make it easy to write unit tests, instead of unique id generation.
+ // however, this does mean that in writing tests, we have to be aware that
+ // repl dump will clash with prior dumps, and thus have to clean up properly.
+ } else {
+ return String.valueOf(System.currentTimeMillis());
+ // TODO: time good enough for now - we'll likely improve this.
+ // We may also work in something the equivalent of pid, thrid and move to nanos to ensure
+ // uniqueness.
+ }
+ }
+
+ /**
+ *
+ * @param dbName
+ * @param dumpRoot
+ * @return db dumped path
+ * @throws SemanticException
+ */
+ private Path dumpDbMetadata(String dbName, Path dumpRoot) throws SemanticException {
+ Path dbRoot = new Path(dumpRoot, dbName);
+ try {
+ // TODO : instantiating FS objects are generally costly. Refactor
+ FileSystem fs = dbRoot.getFileSystem(conf);
+ Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
+ Database dbObj = db.getDatabase(dbName);
+ EximUtil.createDbExportDump(fs, dumpPath, dbObj, getNewReplicationSpec());
+ } catch (Exception e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+ return dbRoot;
+ }
+
+ /**
+ *
+ * @param ast
+ * @param dbName
+ * @param tblName
+ * @param dbRoot
+ * @return tbl dumped path
+ * @throws SemanticException
+ */
+ private Path dumpTbl(ASTNode ast, String dbName, String tblName, Path dbRoot) throws SemanticException {
+ Path tableRoot = new Path(dbRoot, tblName);
+ try {
+ URI toURI = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
+ TableSpec ts = new TableSpec(db, conf, dbName + "." + tblName, null);
+ ExportSemanticAnalyzer.prepareExport(ast, toURI, ts, getNewReplicationSpec(), db, conf, ctx,
+ rootTasks, inputs, outputs, LOG);
+ } catch (HiveException e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+ return tableRoot;
+ }
+
+ // REPL LOAD
+ private void initReplLoad(ASTNode ast) {
+ int numChildren = ast.getChildCount();
+ path = PlanUtils.stripQuotes(ast.getChild(0).getText());
+ if (numChildren > 1) {
+ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+ }
+ if (numChildren > 2) {
+ tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(2).getText());
+ }
+ }
+
+ /*
+ * Example dump dirs we need to be able to handle :
+ *
+ * for: hive.repl.rootdir = staging/ Then, repl dumps will be created in staging/<dumpdir>
+ *
+ * single-db-dump: staging/blah12345 blah12345/ default/ _metadata tbl1/ _metadata dt=20160907/
+ * _files tbl2/ tbl3/ unptn_tbl/ _metadata _files
+ *
+ * multi-db-dump: staging/bar12347 staging/ bar12347/ default/ ... sales/ ...
+ *
+ * single table-dump: staging/baz123 staging/ baz123/ _metadata dt=20150931/ _files
+ */
+ private void analyzeReplLoad(ASTNode ast) throws SemanticException {
+ LOG.debug("ReplSemanticAnalyzer.analyzeReplLoad: " + String.valueOf(dbNameOrPattern) + "."
+ + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(path));
+
+ // for analyze repl load, we walk through the dir structure available in the path,
+ // looking at each db, and then each table, and then setting up the appropriate
+ // import job in its place.
+
+ // FIXME : handle non-bootstrap cases.
+
+ // We look at the path, and go through each subdir.
+ // Each subdir corresponds to a database.
+ // For each subdir, there is a _metadata file which allows us to re-impress the db object
+ // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend
+ // that we had an IMPORT on each of them, into this db.
+
+ try {
+
+ Path loadPath = new Path(path);
+ final FileSystem fs = loadPath.getFileSystem(conf);
+
+ if (!fs.exists(loadPath)) {
+ // supposed dump path does not exist.
+ throw new FileNotFoundException(loadPath.toUri().toString());
+ }
+
+ // Now, the dumped path can be one of two things:
+ // a) It can be a db dump, in which case we expect a set of dirs, each with a
+ // db name, and with a _metadata file in each, and table dirs inside that.
+ // b) It can be a table dump dir, in which case we expect a _metadata dump of
+ // a table in question in the dir, and individual ptn dir hierarchy.
+ // Once we expand this into doing incremental repl, we can have individual events which can
+ // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed
+ // that this is a tbl-level dump, and it is an error condition if we find anything else. Also,
+ // if dbname is specified, we expect exactly one db dumped, and having more is an error
+ // condition.
+
+ if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+ analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
+ return;
+ }
+
+ FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath);
+ if (srcs == null || (srcs.length == 0)) {
+ throw new FileNotFoundException(loadPath.toUri().toString());
+ }
+
+ FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
+
+ if ((dirsInLoadPath == null) || (dirsInLoadPath.length == 0)) {
+ throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
+ }
+
+ if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
+ LOG.debug("Found multiple dirs when we expected 1:");
+ for (FileStatus d : dirsInLoadPath) {
+ LOG.debug("> " + d.getPath().toUri().toString());
+ }
+ throw new IllegalArgumentException(
+ "Multiple dirs in "
+ + loadPath.toUri().toString()
+ + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
+ }
+
+ for (FileStatus dir : dirsInLoadPath) {
+ analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+ }
+
+ } catch (Exception e) {
+ // TODO : simple wrap & rethrow for now, clean up with error codes
+ throw new SemanticException(e);
+ }
+
+ }
+
+ private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
+ throws SemanticException {
+ try {
+ // Path being passed to us is a db dump location. We go ahead and load as needed.
+ // dbName might be null or empty, in which case we keep the original db name for the new
+ // database creation
+
+ // Two steps here - first, we read the _metadata file here, and create a CreateDatabaseDesc
+ // associated with that
+ // Then, we iterate over all subdirs, and create table imports for each.
+
+ EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+ try {
+ rv = EximUtil.readMetaData(fs, new Path(dir.getPath(), EximUtil.METADATA_NAME));
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ }
+
+ Database dbObj = rv.getDatabase();
+
+ if (dbObj == null) {
+ throw new IllegalArgumentException(
+ "_metadata file read did not contain a db object - invalid dump.");
+ }
+
+ if ((dbName == null) || (dbName.isEmpty())) {
+ // We use dbName specified as long as it is not null/empty. If so, then we use the original
+ // name
+ // recorded in the thrift object.
+ dbName = dbObj.getName();
+ }
+
+ CreateDatabaseDesc createDbDesc = new CreateDatabaseDesc();
+ createDbDesc.setName(dbName);
+ createDbDesc.setComment(dbObj.getDescription());
+ createDbDesc.setDatabaseProperties(dbObj.getParameters());
+ // note that we do not set location - for repl load, we want that auto-created.
+
+ createDbDesc.setIfNotExists(false);
+ // If it exists, we want this to be an error condition. Repl Load is not intended to replace a
+ // db.
+ // TODO: we might revisit this in create-drop-recreate cases, needs some thinking on.
+ Task<? extends Serializable> createDbTask = TaskFactory.get(new DDLWork(inputs, outputs, createDbDesc), conf);
+ rootTasks.add(createDbTask);
+
+ FileStatus[] dirsInDbPath = fs.listStatus(dir.getPath(), EximUtil.getDirectoryFilter(fs));
+
+ for (FileStatus tableDir : dirsInDbPath) {
+ analyzeTableLoad(dbName, null, tableDir.getPath().toUri().toString(), createDbTask);
+ }
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ private void analyzeTableLoad(String dbName, String tblName, String locn,
+ Task<? extends Serializable> precursor) throws SemanticException {
+ // Path being passed to us is a table dump location. We go ahead and load it in as needed.
+ // If tblName is null, then we default to the table name specified in _metadata, which is good.
+ // or are both specified, in which case, that's what we are intended to create the new table as.
+ if (dbName == null || dbName.isEmpty()) {
+ throw new SemanticException("Database name cannot be null for a table load");
+ }
+ try {
+ // no location set on repl loads
+ boolean isLocationSet = false;
+ // all repl imports are non-external
+ boolean isExternalSet = false;
+ // bootstrap loads are not partition level
+ boolean isPartSpecSet = false;
+ // repl loads are not partition level
+ LinkedHashMap<String, String> parsedPartSpec = null;
+ // no location for repl imports
+ String parsedLocation = null;
+ boolean waitOnCreateDb = false;
+ List<Task<? extends Serializable>> importTasks = null;
+ if (precursor == null) {
+ importTasks = rootTasks;
+ waitOnCreateDb = false;
+ } else {
+ importTasks = new ArrayList<Task<? extends Serializable>>();
+ waitOnCreateDb = true;
+ }
+ EximUtil.SemanticAnalyzerWrapperContext x =
+ new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, importTasks, LOG,
+ ctx);
+ ImportSemanticAnalyzer.prepareImport(isLocationSet, isExternalSet, isPartSpecSet,
+ waitOnCreateDb, parsedLocation, tblName, dbName, parsedPartSpec, locn, x);
+
+ if (precursor != null) {
+ for (Task<? extends Serializable> t : importTasks) {
+ precursor.addDependentTask(t);
+ }
+ }
+
+ } catch (Exception e) {
+ throw new SemanticException(e);
+ }
+ }
+
+ // REPL STATUS
+ private void initReplStatus(ASTNode ast) {
+ int numChildren = ast.getChildCount();
+ dbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
+ if (numChildren > 1) {
+ tblNameOrPattern = PlanUtils.stripQuotes(ast.getChild(1).getText());
+ }
+ }
+
+ private void analyzeReplStatus(ASTNode ast) throws SemanticException {
+ LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: " + String.valueOf(dbNameOrPattern)
+ + "." + String.valueOf(tblNameOrPattern));
+
+ String replLastId = null;
+
+ try {
+ if (tblNameOrPattern != null) {
+ // Checking for status of table
+ Table tbl = db.getTable(dbNameOrPattern, tblNameOrPattern);
+ if (tbl != null) {
+ inputs.add(new ReadEntity(tbl));
+ Map<String, String> params = tbl.getParameters();
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+ replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+ }
+ }
+ } else {
+ // Checking for status of a db
+ Database database = db.getDatabase(dbNameOrPattern);
+ if (database != null) {
+ inputs.add(new ReadEntity(database));
+ Map<String, String> params = database.getParameters();
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID))) {
+ replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID);
+ }
+ }
+ }
+ } catch (HiveException e) {
+ throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+ // codes
+ }
+
+ LOG.debug("RSTATUS: writing repl.last.id=" + String.valueOf(replLastId) + " out to "
+ + ctx.getResFile());
+ prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+ }
+
+ private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
+ LOG.debug("prepareReturnValues : " + schema);
+ for (String s : values) {
+ LOG.debug(" > " + s);
+ }
+
+ ctx.setResFile(ctx.getLocalTmpPath());
+ // FIXME : this should not accessible by the user if we write to it from the frontend.
+ // Thus, we should Desc/Work this, otherwise there is a security issue here.
+ // Note: if we don't call ctx.setResFile, we get a NPE from the following code section
+ // If we do call it, then FetchWork thinks that the "table" here winds up thinking that
+ // this is a partitioned dir, which does not work. Thus, this does not work.
+
+ writeOutput(values);
+ }
+
+ private void writeOutput(List<String> values) throws SemanticException {
+ Path outputFile = ctx.getResFile();
+ FileSystem fs = null;
+ DataOutputStream outStream = null;
+ try {
+ fs = outputFile.getFileSystem(conf);
+ outStream = fs.create(outputFile);
+ outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
+ for (int i = 1; i < values.size(); i++) {
+ outStream.write(Utilities.ctrlaCode);
+ outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1)));
+ }
+ outStream.write(Utilities.newLineCode);
+ } catch (IOException e) {
+ throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+ // codes
+ } finally {
+ IOUtils.closeStream(outStream); // TODO : we have other closes here, and in ReplCopyTask -
+ // replace with this
+ }
+ }
+
+ private ReplicationSpec getNewReplicationSpec() throws SemanticException {
+ try {
+ ReplicationSpec replicationSpec =
+ new ReplicationSpec(true, false, "replv2", "will-be-set", false, true);
+ replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC()
+ .getCurrentNotificationEventId().getEventId()));
+ return replicationSpec;
+ } catch (Exception e) {
+ throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
+ // codes
+ }
+ }
+
+ private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
+ throws HiveException {
+ if (tblPattern == null) {
+ return db.getAllTables(dbName);
+ } else {
+ return db.getTablesByPattern(dbName, tblPattern);
+ }
+ }
+
+ private Iterable<? extends String> matchesDb(String dbPattern) throws HiveException {
+ if (dbPattern == null) {
+ return db.getAllDatabases();
+ } else {
+ return db.getDatabasesByPattern(dbPattern);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 4668271..824cf11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -42,6 +42,7 @@ public class ReplicationSpec {
private String eventId = null;
private String currStateId = null;
private boolean isNoop = false;
+ private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
// Key definitions related to replication
@@ -49,8 +50,9 @@ public class ReplicationSpec {
REPL_SCOPE("repl.scope"),
EVENT_ID("repl.event.id"),
CURR_STATE_ID("repl.last.id"),
- NOOP("repl.noop");
-
+ NOOP("repl.noop"),
+ LAZY("repl.lazy"),
+ ;
private final String keyName;
KEY(String s) {
@@ -102,32 +104,32 @@ public class ReplicationSpec {
this((ASTNode)null);
}
- public ReplicationSpec(
- boolean isInReplicationScope, boolean isMetadataOnly, String eventReplicationState,
- String currentReplicationState, boolean isNoop){
+ public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
+ String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy) {
this.isInReplicationScope = isInReplicationScope;
this.isMetadataOnly = isMetadataOnly;
this.eventId = eventReplicationState;
this.currStateId = currentReplicationState;
this.isNoop = isNoop;
+ this.isLazy = isLazy;
}
public ReplicationSpec(Function<String, String> keyFetcher) {
String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString());
this.isMetadataOnly = false;
this.isInReplicationScope = false;
- if (scope != null){
- if (scope.equalsIgnoreCase("metadata")){
+ if (scope != null) {
+ if (scope.equalsIgnoreCase("metadata")) {
this.isMetadataOnly = true;
this.isInReplicationScope = true;
- } else if (scope.equalsIgnoreCase("all")){
+ } else if (scope.equalsIgnoreCase("all")) {
this.isInReplicationScope = true;
}
}
this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString());
this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
- this.isNoop = Boolean.parseBoolean(
- keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+ this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
+ this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
}
/**
@@ -280,6 +282,21 @@ public class ReplicationSpec {
this.isNoop = isNoop;
}
+ /**
+ * @return whether or not the current replication action is set to be lazy
+ */
+ public boolean isLazy() {
+ return isLazy;
+ }
+
+ /**
+ * @param isLazy whether or not the current replication action should be lazy
+ */
+ public void setLazy(boolean isLazy){
+ this.isLazy = isLazy;
+ }
+
+
public String get(KEY key) {
switch (key){
case REPL_SCOPE:
@@ -297,6 +314,8 @@ public class ReplicationSpec {
return getCurrentReplicationState();
case NOOP:
return String.valueOf(isNoop());
+ case LAZY:
+ return String.valueOf(isLazy());
}
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index ed01a31..520d3de 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -128,6 +128,10 @@ public final class SemanticAnalyzerFactory {
commandType.put(HiveParser.TOK_COMMIT, HiveOperation.COMMIT);
commandType.put(HiveParser.TOK_ROLLBACK, HiveOperation.ROLLBACK);
commandType.put(HiveParser.TOK_SET_AUTOCOMMIT, HiveOperation.SET_AUTOCOMMIT);
+ commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.EXPORT); // piggyback on EXPORT security handling for now
+ commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.IMPORT); // piggyback on IMPORT security handling for now
+ commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.SHOW_TBLPROPERTIES); // TODO : also actually DESCDATABASE
+
}
static {
@@ -185,6 +189,12 @@ public final class SemanticAnalyzerFactory {
return new ExportSemanticAnalyzer(queryState);
case HiveParser.TOK_IMPORT:
return new ImportSemanticAnalyzer(queryState);
+ case HiveParser.TOK_REPL_DUMP:
+ return new ReplicationSemanticAnalyzer(queryState);
+ case HiveParser.TOK_REPL_LOAD:
+ return new ReplicationSemanticAnalyzer(queryState);
+ case HiveParser.TOK_REPL_STATUS:
+ return new ReplicationSemanticAnalyzer(queryState);
case HiveParser.TOK_ALTERTABLE: {
Tree child = tree.getChild(1);
switch (child.getType()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
new file mode 100644
index 0000000..1932d60
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplCopyWork.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.plan;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+
+/**
+ * Marker work for Replication - behaves similar to CopyWork, but maps to ReplCopyTask,
+ * which will have mechanics to list the files in source to write to the destination,
+ * instead of copying them, if specified, falling back to copying if needed.
+ */
+@Explain(displayName = "Copy for Replication", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class ReplCopyWork extends CopyWork {
+
+ protected boolean copyFiles = true; // governs copy-or-list-files behaviour.
+ // If set to true, behaves identically to a CopyWork
+ // If set to false, ReplCopyTask does a file-list of the things to be copied instead, and puts them in a file called _files.
+ // Default is set to mimic CopyTask, with the intent that any Replication code will explicitly flip this.
+
+ /**
+ * TODO : Refactor
+ *
+ * There is an upcoming patch that refactors this bit of code. Currently, the idea is the following:
+ *
+ * By default, ReplCopyWork will behave similarly to CopyWork, and simply copy
+ * along data from the source to destination. If, however, listFilesOnOutput is set,
+ * then, instead of copying the individual files to the destination, it simply creates
+ * a file called _files on destination that contains the list of the original files
+ * that were intended to be copied. Thus, we do not actually copy the files at CopyWork
+ * time.
+ *
+ * The flip side of this behaviour happens when, instead, readListFromInput is set. This
+ * flag, if set, changes the source behaviour of this CopyTask, and instead of copying
+ * explicit files, this will then fall back to a behaviour wherein an _files is read from
+ * the source, and the files specified by the _files are then copied to the destination.
+ *
+ * This allows us a lazy-copy-on-source and a pull-from destination semantic that we want
+ * to use from replication.
+ *
+ * ==
+ *
+ * The refactor intent, however, is to simplify this, so that we have only 1 flag that we set,
+ * called isLazy. If isLazy is set, then this is the equivalent of the current listFilesOnOutput,
+ * and will generate a _files file.
+ *
+ * As to the input, we simply decide on whether to use the lazy mode or not depending on the
+ * presence of a _files file on the input. If we see a _files on the input, we simply expand it
+ * to copy as needed. If we do not, we copy as normal.
+ *
+ */
+
+ protected boolean listFilesOnOutput = false; // governs copy-or-list-files behaviour
+ // If set to true, it'll iterate over input files, and for each file in the input,
+ // it'll write out an additional line in a _files file in the output.
+ // If set to false, it'll behave as a traditional CopyTask.
+
+ protected boolean readListFromInput = false; // governs remote-fetch-input behaviour
+ // If set to true, we'll assume that the input has a _files file present which lists
+ // the actual input files to copy, and we'll pull each of those on read.
+ // If set to false, it'll behave as a traditional CopyTask.
+
+ public ReplCopyWork() {
+ }
+
+ public ReplCopyWork(final Path fromPath, final Path toPath) {
+ super(fromPath, toPath, true);
+ }
+
+ public ReplCopyWork(final Path fromPath, final Path toPath, boolean errorOnSrcEmpty) {
+ super(fromPath, toPath, errorOnSrcEmpty);
+ }
+
+ public void setListFilesOnOutputBehaviour(boolean listFilesOnOutput){
+ this.listFilesOnOutput = listFilesOnOutput;
+ }
+
+ public boolean getListFilesOnOutputBehaviour(){
+ return this.listFilesOnOutput;
+ }
+
+ public void setReadListFromInput(boolean readListFromInput){
+ this.readListFromInput = readListFromInput;
+ }
+
+ public boolean getReadListFromInput(){
+ return this.readListFromInput;
+ }
+
+ // specialization of getListFilesOnOutputBehaviour, with a filestatus arg
+ // we can default to the default getListFilesOnOutputBehaviour behaviour,
+ // or, we can do additional pattern matching to decide that certain files
+ // should not be listed, and copied instead, _metadata files, for instance.
+ // Currently, we use this to skip _metadata files, but we might decide that
+ // this is not the right place for it later on.
+ public boolean getListFilesOnOutputBehaviour(FileStatus f) {
+ if (f.getPath().toString().contains("_metadata")){
+ return false; // always copy _metadata files
+ }
+ return this.listFilesOnOutput;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
new file mode 100644
index 0000000..54b2bd1
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import static org.junit.Assert.*;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestReplicationSemanticAnalyzer {
+ static QueryState queryState;
+ static HiveConf conf;
+ static String defaultDB = "default";
+ static String tblName = "testReplSA";
+ static ArrayList<String> cols = new ArrayList<String>(Arrays.asList("col1", "col2"));
+ ParseDriver pd;
+ SemanticAnalyzer sA;
+
+
+ @BeforeClass
+ public static void initialize() throws HiveException {
+ queryState = new QueryState(new HiveConf(SemanticAnalyzer.class));
+ conf = queryState.getConf();
+ conf.set("hive.security.authorization.manager", "");
+ SessionState.start(conf);
+ Hive hiveDb = Hive.get(conf);
+ hiveDb.createTable(defaultDB + "." + tblName, cols, null, OrcInputFormat.class, OrcOutputFormat.class);
+ Table t = hiveDb.getTable(tblName);
+ }
+
+ @AfterClass
+ public static void teardown() throws HiveException {
+ }
+
+ @Test
+ public void testReplDumpParse() throws Exception {
+ ParseDriver pd = new ParseDriver();
+ String fromEventId = "100";
+ String toEventId = "200";
+ String batchSize = "50";
+ ASTNode root;
+ ASTNode child;
+
+ String query = "repl dump " + defaultDB;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getText(), "TOK_REPL_DUMP");
+ assertEquals(root.getChildCount(), 1);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl dump " + defaultDB + "." + tblName;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 2);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ root = (ASTNode) root.getChild(2);
+ assertEquals(root.getText(), "TOK_FROM");
+ assertEquals(root.getChildCount(), 1);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), fromEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId;
+
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ root = (ASTNode) root.getChild(2);
+ assertEquals(root.getText(), "TOK_FROM");
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), fromEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), "TOK_TO");
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(2);
+ assertEquals(child.getText(), toEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ query =
+ "repl dump " + defaultDB + "." + tblName + " from " + fromEventId + " to " + toEventId
+ + " batch " + batchSize;
+
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getChildCount(), 3);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), defaultDB);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), tblName);
+ assertEquals(child.getChildCount(), 0);
+
+ root = (ASTNode) root.getChild(2);
+ assertEquals(root.getText(), "TOK_FROM");
+ assertEquals(root.getChildCount(), 5);
+
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), fromEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), "TOK_TO");
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(2);
+ assertEquals(child.getText(), toEventId);
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(3);
+ assertEquals(child.getText(), "TOK_BATCH");
+ assertEquals(child.getChildCount(), 0);
+
+ child = (ASTNode) root.getChild(4);
+ assertEquals(child.getText(), batchSize);
+ assertEquals(child.getChildCount(), 0);
+ }
+
+ @Test
+ public void testReplLoadParse() throws Exception {
+ // FileSystem fs = FileSystem.get(conf);
+ ParseDriver pd = new ParseDriver();
+ ASTNode root;
+ ASTNode child;
+ String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+ Path dumpRoot = new Path(replRoot, "next");
+ System.out.println(replRoot);
+ System.out.println(dumpRoot);
+ String newDB = "default_bak";
+
+ String query = "repl load from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getText(), "TOK_REPL_LOAD");
+ assertEquals(root.getChildCount(), 1);
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+ assertEquals(child.getChildCount(), 0);
+
+ query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ assertEquals(root.getText(), "TOK_REPL_LOAD");
+ assertEquals(root.getChildCount(), 2);
+ child = (ASTNode) root.getChild(0);
+ assertEquals(child.getText(), "'" + dumpRoot.toString() + "'");
+ assertEquals(child.getChildCount(), 0);
+ child = (ASTNode) root.getChild(1);
+ assertEquals(child.getText(), newDB);
+ assertEquals(child.getChildCount(), 0);
+ }
+
+ // TODO: add this test after repl dump analyze generates tasks
+ //@Test
+ public void testReplDumpAnalyze() throws Exception {
+
+ }
+
+ //@Test
+ public void testReplLoadAnalyze() throws Exception {
+ ParseDriver pd = new ParseDriver();
+ ASTNode root;
+ String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
+ FileSystem fs = FileSystem.get(conf);
+ Path dumpRoot = new Path(replRoot, "next");
+ System.out.println(replRoot);
+ System.out.println(dumpRoot);
+ String newDB = "default_bak";
+
+ // First create a dump
+ String query = "repl dump " + defaultDB;
+ root = (ASTNode) pd.parse(query).getChild(0);
+ ReplicationSemanticAnalyzer rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+ rs.analyze(root, new Context(conf));
+
+ // Then analyze load
+ query = "repl load from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+ rs.analyze(root, new Context(conf));
+ List<Task<? extends Serializable>> roots = rs.getRootTasks();
+ assertEquals(1, roots.size());
+
+ query = "repl load " + newDB + " from '" + dumpRoot.toString() + "'";
+ root = (ASTNode) pd.parse(query).getChild(0);
+ rs = (ReplicationSemanticAnalyzer) SemanticAnalyzerFactory.get(queryState, root);
+ rs.analyze(root, new Context(conf));
+ roots = rs.getRootTasks();
+ assertEquals(1, roots.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
index 03515ea..0caa42a 100644
--- a/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
+++ b/ql/src/test/results/clientnegative/exim_00_unsupported_schema.q.out
@@ -19,4 +19,4 @@ POSTHOOK: type: LOAD
#### A masked pattern was here ####
POSTHOOK: Output: default@exim_department
#### A masked pattern was here ####
-FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile
+FAILED: SemanticException Invalid path only the following file systems accepted for export/import : hdfs,pfile,file
[2/2] hive git commit: HIVE-15151: Bootstrap support for replv2
(Sushanth Sowmyan reviewed by Vaibhav Gumashta)
Posted by vg...@apache.org.
HIVE-15151: Bootstrap support for replv2 (Sushanth Sowmyan reviewed by Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/739ac3af
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/739ac3af
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/739ac3af
Branch: refs/heads/master
Commit: 739ac3af32dd74ed735dbc3f942abaa1c293e1cb
Parents: 532e6a7
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Sun Nov 13 20:13:38 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Sun Nov 13 20:13:38 2016 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../hadoop/hive/ql/exec/ReplCopyTask.java | 250 +++++++++
.../apache/hadoop/hive/ql/exec/TaskFactory.java | 2 +
.../hive/ql/parse/BaseSemanticAnalyzer.java | 16 +-
.../apache/hadoop/hive/ql/parse/EximUtil.java | 145 ++++-
.../hive/ql/parse/ExportSemanticAnalyzer.java | 41 +-
.../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 4 +
.../apache/hadoop/hive/ql/parse/HiveParser.g | 38 +-
.../hadoop/hive/ql/parse/IdentifiersParser.g | 1 +
.../hive/ql/parse/ImportSemanticAnalyzer.java | 472 ++++++++--------
.../hive/ql/parse/LoadSemanticAnalyzer.java | 2 +-
.../hive/ql/parse/MetaDataExportListener.java | 4 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 533 +++++++++++++++++++
.../hadoop/hive/ql/parse/ReplicationSpec.java | 39 +-
.../hive/ql/parse/SemanticAnalyzerFactory.java | 10 +
.../hadoop/hive/ql/plan/ReplCopyWork.java | 119 +++++
.../parse/TestReplicationSemanticAnalyzer.java | 259 +++++++++
.../exim_00_unsupported_schema.q.out | 2 +-
18 files changed, 1696 insertions(+), 246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b80f7b5..3a65c80 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -211,6 +211,7 @@ public class HiveConf extends Configuration {
*/
public static final HiveConf.ConfVars[] metaVars = {
HiveConf.ConfVars.METASTOREWAREHOUSE,
+ HiveConf.ConfVars.REPLDIR,
HiveConf.ConfVars.METASTOREURIS,
HiveConf.ConfVars.METASTORE_SERVER_PORT,
HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES,
@@ -432,6 +433,8 @@ public class HiveConf extends Configuration {
"HDFS root scratch dir for Hive jobs which gets created with write all (733) permission. " +
"For each connecting user, an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, " +
"with ${hive.scratch.dir.permission}."),
+ REPLDIR("hive.repl.rootdir","/user/hive/repl/",
+ "HDFS root dir for all replication dumps."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
@@ -2062,7 +2065,7 @@ public class HiveConf extends Configuration {
"When true the HDFS location stored in the index file will be ignored at runtime.\n" +
"If the data got moved or the name of the cluster got changed, the index data should still be usable."),
- HIVE_EXIM_URI_SCHEME_WL("hive.exim.uri.scheme.whitelist", "hdfs,pfile",
+ HIVE_EXIM_URI_SCHEME_WL("hive.exim.uri.scheme.whitelist", "hdfs,pfile,file",
"A comma separated list of acceptable URI schemes for import and export."),
// temporary variable for testing. This is added just to turn off this feature in case of a bug in
// deployment. It has not been documented in hive-default.xml intentionally, this should be removed
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
new file mode 100644
index 0000000..4c0f817
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.plan.CopyWork;
+import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.util.StringUtils;
+
+public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
+
+
+ private static final long serialVersionUID = 1L;
+
+ private static transient final Logger LOG = LoggerFactory.getLogger(ReplCopyTask.class);
+
+ public ReplCopyTask(){
+ super();
+ }
+
+ @Override
+ protected int execute(DriverContext driverContext) {
+ LOG.debug("ReplCopyTask.execute()");
+ FileSystem dstFs = null;
+ Path toPath = null;
+ try {
+ Path fromPath = work.getFromPath();
+ toPath = work.getToPath();
+
+ console.printInfo("Copying data from " + fromPath.toString(), " to "
+ + toPath.toString());
+
+ ReplCopyWork rwork = ((ReplCopyWork)work);
+
+ FileSystem srcFs = fromPath.getFileSystem(conf);
+ dstFs = toPath.getFileSystem(conf);
+
+ List<FileStatus> srcFiles = new ArrayList<FileStatus>();
+ FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(srcFs, fromPath);
+ LOG.debug("ReplCopyTasks srcs=" + (srcs == null ? "null" : srcs.length));
+ if (! rwork.getReadListFromInput()){
+ if (srcs == null || srcs.length == 0) {
+ if (work.isErrorOnSrcEmpty()) {
+ console.printError("No files matching path: " + fromPath.toString());
+ return 3;
+ } else {
+ return 0;
+ }
+ }
+ } else {
+ LOG.debug("ReplCopyTask making sense of _files");
+ // Our input is probably the result of a _files listing, we should expand out _files.
+ srcFiles = filesInFileListing(srcFs,fromPath);
+ LOG.debug("ReplCopyTask _files contains:" + (srcFiles == null ? "null" : srcFiles.size()));
+ if (srcFiles == null){
+ if (work.isErrorOnSrcEmpty()) {
+ console.printError("No _files entry found on source: " + fromPath.toString());
+ return 5;
+ } else {
+ return 0;
+ }
+ }
+ }
+ // Add in all the lone filecopies expected as well - applies to
+ // both _files case stragglers and regular copies
+ srcFiles.addAll(Arrays.asList(srcs));
+ LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size()));
+
+ boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+ if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
+ console.printError("Cannot make target directory: " + toPath.toString());
+ return 2;
+ }
+
+ BufferedWriter listBW = null;
+ if (rwork.getListFilesOnOutputBehaviour()){
+ Path listPath = new Path(toPath,"_files");
+ LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString());
+ if (dstFs.exists(listPath)){
+ console.printError("Cannot make target _files file:" + listPath.toString());
+ return 4;
+ }
+ listBW = new BufferedWriter(new OutputStreamWriter(dstFs.create(listPath)));
+ // TODO : verify that not specifying charset here does not bite us
+ // later(for cases where filenames have unicode chars)
+ }
+
+ for (FileStatus oneSrc : srcFiles) {
+ console.printInfo("Copying file: " + oneSrc.getPath().toString());
+ LOG.debug("Copying file: " + oneSrc.getPath().toString());
+ if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
+ FileSystem actualSrcFs = null;
+ if (rwork.getReadListFromInput()){
+ // TODO : filesystemcache prevents this from being a perf nightmare, but we
+ // should still probably follow up to see if we need to do something better here.
+ actualSrcFs = oneSrc.getPath().getFileSystem(conf);
+ } else {
+ actualSrcFs = srcFs;
+ }
+
+ LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
+ if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
+ false, // delete source
+ true, // overwrite destination
+ conf)) {
+ console.printError("Failed to copy: '" + oneSrc.getPath().toString()
+ + "to: '" + toPath.toString() + "'");
+ return 1;
+ }
+ }else{
+ LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
+ console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
+ listBW.write(oneSrc.getPath().toUri().toString() + "\n");
+ }
+ }
+
+ if (listBW != null){
+ listBW.close();
+ }
+
+ return 0;
+
+ } catch (Exception e) {
+ console.printError("Failed with exception " + e.getMessage(), "\n"
+ + StringUtils.stringifyException(e));
+ return (1);
+ }
+ }
+
+
+ private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
+ throws IOException {
+ Path fileListing = new Path(path, "_files");
+ LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri());
+ if (! fs.exists(fileListing)){
+ LOG.debug("ReplCopyTask : _files does not exist");
+ return null; // Returning null from this fn can serve as an err condition.
+ // On success, but with nothing to return, we can return an empty list.
+ }
+
+ List<FileStatus> ret = new ArrayList<FileStatus>();
+ BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing)));
+ // TODO : verify if skipping charset here is okay
+
+ String line = null;
+ while ( (line = br.readLine()) != null){
+ LOG.debug("ReplCopyTask :_filesReadLine:" + line);
+ Path p = new Path(line);
+ FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this okay, but if not, revisit
+ ret.add(srcFs.getFileStatus(p));
+ // Note - we need srcFs rather than fs, because it is possible that the _files lists files
+ // which are from a different filesystem than the fs where the _files file itself was loaded
+ // from. Currently, it is possible, for eg., to do REPL LOAD hdfs://<ip>/dir/ and for the _files
+ // in it to contain hdfs://<name>/ entries, and/or vice-versa, and this causes errors.
+ // It might also be possible that there will be a mix of them in a given _files file.
+ // TODO: revisit close to the end of replv2 dev, to see if our assumption now still holds,
+ // and if not so, optimize.
+ }
+
+ return ret;
+ }
+
+ @Override
+ public StageType getType() {
+ return StageType.COPY;
+ // there's no extensive need for this to have its own type - it mirrors
+ // the intent of copy enough. This might change later, though.
+ }
+
+ @Override
+ public String getName() {
+ return "REPL_COPY";
+ }
+
+ public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+ Task<?> copyTask = null;
+ LOG.debug("ReplCopyTask:getLoadCopyTask: "+srcPath + "=>" + dstPath);
+ if (replicationSpec.isInReplicationScope()){
+ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+ LOG.debug("ReplCopyTask:\trcwork");
+ if (replicationSpec.isLazy()){
+ LOG.debug("ReplCopyTask:\tlazy");
+ rcwork.setReadListFromInput(true);
+ }
+ copyTask = TaskFactory.get(rcwork, conf);
+ } else {
+ LOG.debug("ReplCopyTask:\tcwork");
+ copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+ }
+ return copyTask;
+ }
+
+ public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath, HiveConf conf) {
+ Task<?> copyTask = null;
+ LOG.debug("ReplCopyTask:getDumpCopyTask: "+srcPath + "=>" + dstPath);
+ if (replicationSpec.isInReplicationScope()){
+ ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false);
+ LOG.debug("ReplCopyTask:\trcwork");
+ if (replicationSpec.isLazy()){
+ LOG.debug("ReplCopyTask:\tlazy");
+ rcwork.setListFilesOnOutputBehaviour(true);
+ }
+ copyTask = TaskFactory.get(rcwork, conf);
+ } else {
+ LOG.debug("ReplCopyTask:\tcwork");
+ copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
+ }
+ return copyTask;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 14fd61a..d61a460 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.FunctionWork;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
+import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
import org.apache.hadoop.hive.ql.plan.SparkWork;
import org.apache.hadoop.hive.ql.plan.StatsNoJobWork;
import org.apache.hadoop.hive.ql.plan.StatsWork;
@@ -77,6 +78,7 @@ public final class TaskFactory {
taskvec.add(new TaskTuple<MoveWork>(MoveWork.class, MoveTask.class));
taskvec.add(new TaskTuple<FetchWork>(FetchWork.class, FetchTask.class));
taskvec.add(new TaskTuple<CopyWork>(CopyWork.class, CopyTask.class));
+ taskvec.add(new TaskTuple<ReplCopyWork>(ReplCopyWork.class, ReplCopyTask.class));
taskvec.add(new TaskTuple<DDLWork>(DDLWork.class, DDLTask.class));
taskvec.add(new TaskTuple<FunctionWork>(FunctionWork.class,
FunctionTask.class));
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index ffb6ae3..7b63c52 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -1607,8 +1607,12 @@ public abstract class BaseSemanticAnalyzer {
}
protected WriteEntity toWriteEntity(Path location) throws SemanticException {
+ return toWriteEntity(location,conf);
+ }
+
+ public static WriteEntity toWriteEntity(Path location, HiveConf conf) throws SemanticException {
try {
- Path path = tryQualifyPath(location);
+ Path path = tryQualifyPath(location,conf);
return new WriteEntity(path, FileUtils.isLocalFile(conf, path.toUri()));
} catch (Exception e) {
throw new SemanticException(e);
@@ -1620,8 +1624,12 @@ public abstract class BaseSemanticAnalyzer {
}
protected ReadEntity toReadEntity(Path location) throws SemanticException {
+ return toReadEntity(location, conf);
+ }
+
+ public static ReadEntity toReadEntity(Path location, HiveConf conf) throws SemanticException {
try {
- Path path = tryQualifyPath(location);
+ Path path = tryQualifyPath(location, conf);
return new ReadEntity(path, FileUtils.isLocalFile(conf, path.toUri()));
} catch (Exception e) {
throw new SemanticException(e);
@@ -1629,6 +1637,10 @@ public abstract class BaseSemanticAnalyzer {
}
private Path tryQualifyPath(Path path) throws IOException {
+ return tryQualifyPath(path,conf);
+ }
+
+ public static Path tryQualifyPath(Path path, HiveConf conf) throws IOException {
try {
return path.getFileSystem(conf).makeQualified(path);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 167f7a5..a0d492d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -20,6 +20,14 @@ package org.apache.hadoop.hive.ql.parse;
import com.google.common.base.Function;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -45,10 +53,12 @@ import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -63,8 +73,73 @@ import java.util.TreeMap;
*/
public class EximUtil {
+ public static final String METADATA_NAME="_metadata";
+
private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
+ /**
+ * Wrapper class for common BaseSemanticAnalyzer non-static members
+ * into static generic methods without having the fn signatures
+ * becoming overwhelming, with passing each of these into every function.
+ *
+ * Note, however, that since this is constructed with args passed in,
+ * parts of the context, such as the tasks or inputs, might have been
+ * overridden with temporary context values, rather than being exactly
+ * 1:1 equivalent to BaseSemanticAnalyzer.getRootTasks() or BSA.getInputs().
+ */
+ public static class SemanticAnalyzerWrapperContext {
+ private HiveConf conf;
+ private Hive db;
+ private HashSet<ReadEntity> inputs;
+ private HashSet<WriteEntity> outputs;
+ private List<Task<? extends Serializable>> tasks;
+ private Logger LOG;
+ private Context ctx;
+
+ public HiveConf getConf() {
+ return conf;
+ }
+
+ public Hive getHive() {
+ return db;
+ }
+
+ public HashSet<ReadEntity> getInputs() {
+ return inputs;
+ }
+
+ public HashSet<WriteEntity> getOutputs() {
+ return outputs;
+ }
+
+ public List<Task<? extends Serializable>> getTasks() {
+ return tasks;
+ }
+
+ public Logger getLOG() {
+ return LOG;
+ }
+
+ public Context getCtx() {
+ return ctx;
+ }
+
+ public SemanticAnalyzerWrapperContext(HiveConf conf, Hive db,
+ HashSet<ReadEntity> inputs,
+ HashSet<WriteEntity> outputs,
+ List<Task<? extends Serializable>> tasks,
+ Logger LOG, Context ctx){
+ this.conf = conf;
+ this.db = db;
+ this.inputs = inputs;
+ this.outputs = outputs;
+ this.tasks = tasks;
+ this.LOG = LOG;
+ this.ctx = ctx;
+ }
+ }
+
+
private EximUtil() {
}
@@ -162,10 +237,41 @@ public class EximUtil {
}
/* major version number should match for backward compatibility */
- public static final String METADATA_FORMAT_VERSION = "0.1";
+ public static final String METADATA_FORMAT_VERSION = "0.2";
+
/* If null, then the major version number should match */
public static final String METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION = null;
+ public static void createDbExportDump(
+ FileSystem fs, Path metadataPath, Database dbObj,
+ ReplicationSpec replicationSpec) throws IOException, SemanticException {
+
+ // WARNING NOTE : at this point, createDbExportDump lives only in a world where ReplicationSpec is in replication scope
+ // If we later make this work for non-repl cases, analysis of this logic might become necessary. Also, this is using
+ // Replv2 semantics, i.e. with listFiles laziness (no copy at export time)
+
+ OutputStream out = fs.create(metadataPath);
+ JsonGenerator jgen = (new JsonFactory()).createJsonGenerator(out);
+ jgen.writeStartObject();
+ jgen.writeStringField("version",METADATA_FORMAT_VERSION);
+ dbObj.putToParameters(ReplicationSpec.KEY.CURR_STATE_ID.toString(), replicationSpec.getCurrentReplicationState());
+
+ if (METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION != null) {
+ jgen.writeStringField("fcversion",METADATA_FORMAT_FORWARD_COMPATIBLE_VERSION);
+ }
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ try {
+ jgen.writeStringField("db", serializer.toString(dbObj, "UTF-8"));
+ } catch (TException e) {
+ throw new SemanticException(
+ ErrorMsg.ERROR_SERIALIZE_METASTORE
+ .getMsg(), e);
+ }
+
+ jgen.writeEndObject();
+ jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close.
+ }
+
public static void createExportDump(FileSystem fs, Path metadataPath,
org.apache.hadoop.hive.ql.metadata.Table tableHandle,
Iterable<org.apache.hadoop.hive.ql.metadata.Partition> partitions,
@@ -255,19 +361,25 @@ public class EximUtil {
* Utility class to help return complex value from readMetaData function
*/
public static class ReadMetaData {
+ private final Database db;
private final Table table;
private final Iterable<Partition> partitions;
private final ReplicationSpec replicationSpec;
public ReadMetaData(){
- this(null,null,new ReplicationSpec());
+ this(null,null,null,new ReplicationSpec());
}
- public ReadMetaData(Table table, Iterable<Partition> partitions, ReplicationSpec replicationSpec){
+ public ReadMetaData(Database db, Table table, Iterable<Partition> partitions, ReplicationSpec replicationSpec){
+ this.db = db;
this.table = table;
this.partitions = partitions;
this.replicationSpec = replicationSpec;
}
+ public Database getDatabase(){
+ return db;
+ }
+
public Table getTable() {
return table;
}
@@ -298,12 +410,21 @@ public class EximUtil {
String version = jsonContainer.getString("version");
String fcversion = getJSONStringEntry(jsonContainer, "fcversion");
checkCompatibility(version, fcversion);
+
+ String dbDesc = getJSONStringEntry(jsonContainer, "db");
String tableDesc = getJSONStringEntry(jsonContainer,"table");
+ TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+
+ Database db = null;
+ if (dbDesc != null){
+ db = new Database();
+ deserializer.deserialize(db, dbDesc, "UTF-8");
+ }
+
Table table = null;
List<Partition> partitionsList = null;
if (tableDesc != null){
table = new Table();
- TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
deserializer.deserialize(table, tableDesc, "UTF-8");
// TODO : jackson-streaming-iterable-redo this
JSONArray jsonPartitions = new JSONArray(jsonContainer.getString("partitions"));
@@ -316,7 +437,7 @@ public class EximUtil {
}
}
- return new ReadMetaData(table, partitionsList,readReplicationSpec(jsonContainer));
+ return new ReadMetaData(db, table, partitionsList,readReplicationSpec(jsonContainer));
} catch (JSONException e) {
throw new SemanticException(ErrorMsg.ERROR_SERIALIZE_METADATA.getMsg(), e);
} catch (TException e) {
@@ -438,4 +559,18 @@ public class EximUtil {
}
return true;
}
+
+ public static PathFilter getDirectoryFilter(final FileSystem fs) {
+ // TODO : isn't there a prior impl of an isDirectory utility PathFilter so users don't have to write their own?
+ return new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ try {
+ return fs.isDirectory(p);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index 475f2c9..f61274b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -22,6 +22,8 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
+import java.util.HashSet;
+import java.util.List;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.fs.FileStatus;
@@ -29,16 +31,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.plan.CopyWork;
+import org.slf4j.Logger;
/**
* ExportSemanticAnalyzer.
@@ -88,6 +93,18 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
+ // All parsing is done, we're now good to start the export process.
+ prepareExport(ast, toURI, ts, replicationSpec, db, conf, ctx, rootTasks, inputs, outputs, LOG);
+
+ }
+
+ // FIXME : Move to EximUtil - it's okay for this to stay here for a little while more till we finalize the statics
+ public static void prepareExport(
+ ASTNode ast, URI toURI, TableSpec ts,
+ ReplicationSpec replicationSpec, Hive db, HiveConf conf,
+ Context ctx, List<Task<? extends Serializable>> rootTasks, HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
+ Logger LOG) throws SemanticException {
+
if (ts != null) {
try {
EximUtil.validateTable(ts.tableHandle);
@@ -109,6 +126,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
try {
+
FileSystem fs = FileSystem.get(toURI, conf);
Path toPath = new Path(toURI.getScheme(), toURI.getAuthority(), toURI.getPath());
try {
@@ -156,12 +174,12 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
EximUtil.createExportDump(
FileSystem.getLocal(conf),
path,
- (ts != null ? ts.tableHandle: null),
+ (ts != null ? ts.tableHandle : null),
partitions,
replicationSpec);
- Task<? extends Serializable> rTask = TaskFactory.get(new CopyWork(
- path, new Path(toURI), false), conf);
+ Task<? extends Serializable> rTask = ReplCopyTask.getDumpCopyTask(replicationSpec, path, new Path(toURI), conf);
+
rootTasks.add(rTask);
LOG.debug("_metadata file written into " + path.toString()
+ " and then copied to " + toURI.toString());
@@ -177,23 +195,22 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
for (Partition partition : partitions) {
Path fromPath = partition.getDataLocation();
Path toPartPath = new Path(parentPath, partition.getName());
- Task<? extends Serializable> rTask = TaskFactory.get(
- new CopyWork(fromPath, toPartPath, false),
- conf);
+ Task<? extends Serializable> rTask =
+ ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toPartPath, conf);
rootTasks.add(rTask);
inputs.add(new ReadEntity(partition));
}
} else {
Path fromPath = ts.tableHandle.getDataLocation();
Path toDataPath = new Path(parentPath, "data");
- Task<? extends Serializable> rTask = TaskFactory.get(new CopyWork(
- fromPath, toDataPath, false), conf);
+ Task<? extends Serializable> rTask =
+ ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
rootTasks.add(rTask);
inputs.add(new ReadEntity(ts.tableHandle));
}
- outputs.add(toWriteEntity(parentPath));
+ outputs.add(toWriteEntity(parentPath,conf));
}
-
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
index b467c51..4357328 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g
@@ -338,6 +338,10 @@ KW_EXTRACT: 'EXTRACT';
KW_FLOOR: 'FLOOR';
KW_MERGE: 'MERGE';
KW_MATCHED: 'MATCHED';
+KW_REPL: 'REPL';
+KW_DUMP: 'DUMP';
+KW_BATCH: 'BATCH';
+KW_STATUS: 'STATUS';
// Operators
// NOTE: if you add a new function/operator, add it to sysFuncNames so that describe function _FUNC_ will work.
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index bd53a36..55915a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -388,7 +388,11 @@ TOK_MERGE;
TOK_MATCHED;
TOK_NOT_MATCHED;
TOK_UPDATE;
-TOK_DELETE;
+TOK_DELETE;TOK_REPL_DUMP;
+TOK_REPL_LOAD;
+TOK_REPL_STATUS;
+TOK_BATCH;
+TOK_TO;
}
@@ -738,6 +742,9 @@ execStatement
| loadStatement
| exportStatement
| importStatement
+ | replDumpStatement
+ | replLoadStatement
+ | replStatusStatement
| ddlStatement
| deleteStatement
| updateStatement
@@ -779,6 +786,35 @@ importStatement
-> ^(TOK_IMPORT $path $tab? $ext? tableLocation?)
;
+replDumpStatement
+@init { pushMsg("replication dump statement", state); }
+@after { popMsg(state); }
+ : KW_REPL KW_DUMP
+ (dbName=identifier) (DOT tblName=identifier)?
+ (KW_FROM (eventId=Number)
+ (KW_TO (rangeEnd=Number))?
+ (KW_BATCH (batchSize=Number))?
+ )?
+ -> ^(TOK_REPL_DUMP $dbName $tblName? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_BATCH $batchSize)?)? )
+ ;
+
+replLoadStatement
+@init { pushMsg("replication load statement", state); }
+@after { popMsg(state); }
+ : KW_REPL KW_LOAD
+ ((dbName=identifier) (DOT tblName=identifier)?)?
+ KW_FROM (path=StringLiteral)
+ -> ^(TOK_REPL_LOAD $path $dbName? $tblName?)
+ ;
+
+replStatusStatement
+@init { pushMsg("replication load statement", state); }
+@after { popMsg(state); }
+ : KW_REPL KW_STATUS
+ (dbName=identifier) (DOT tblName=identifier)?
+ -> ^(TOK_REPL_STATUS $dbName $tblName?)
+ ;
+
ddlStatement
@init { pushMsg("ddl statement", state); }
@after { popMsg(state); }
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
index aa92739..a82083b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g
@@ -762,6 +762,7 @@ nonReserved
| KW_KEY
| KW_MERGE
| KW_MATCHED
+ | KW_REPL | KW_DUMP | KW_BATCH | KW_STATUS
;
//The following SQL2011 reserved keywords are used as function name only, but not as identifiers.
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 9986fcf..3420efd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -48,16 +49,17 @@ import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
-import org.apache.hadoop.hive.ql.plan.CopyWork;
import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
@@ -73,12 +75,14 @@ import org.apache.hadoop.mapred.OutputFormat;
*/
public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
- public static final String METADATA_NAME="_metadata";
-
public ImportSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
+ // FIXME : Note that the tableExists flag as used by Auth is kinda a hack and
+ // assumes only 1 table will ever be imported - this assumption is broken by
+ // REPL LOAD. We need to fix this. Maybe by continuing the hack and replacing
+ // by a map, maybe by coming up with a better api for it.
private boolean tableExists = false;
public boolean existsTable() {
@@ -92,14 +96,16 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
boolean isLocationSet = false;
boolean isExternalSet = false;
- boolean isTableSet = false;
- boolean isDbNameSet = false;
boolean isPartSpecSet = false;
String parsedLocation = null;
String parsedTableName = null;
String parsedDbName = null;
LinkedHashMap<String, String> parsedPartSpec = new LinkedHashMap<String, String>();
+ // waitOnCreateDb determines whether or not non-existence of
+ // db is an error. For regular imports, it is.
+ boolean waitOnCreateDb = false;
+
for (int i = 1; i < ast.getChildCount(); ++i){
ASTNode child = (ASTNode) ast.getChild(i);
switch (child.getToken().getType()){
@@ -111,14 +117,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
parsedLocation = EximUtil.relativeToAbsolutePath(conf, unescapeSQLString(child.getChild(0).getText()));
break;
case HiveParser.TOK_TAB:
- isTableSet = true;
ASTNode tableNameNode = (ASTNode) child.getChild(0);
Map.Entry<String,String> dbTablePair = getDbTableNamePair(tableNameNode);
parsedDbName = dbTablePair.getKey();
parsedTableName = dbTablePair.getValue();
- if (parsedDbName != null){
- isDbNameSet = true;
- }
// get partition metadata if partition specified
if (child.getChildCount() == 2) {
ASTNode partspec = (ASTNode) child.getChild(1);
@@ -130,111 +132,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
// parsing statement is now done, on to logic.
+ tableExists = prepareImport(
+ isLocationSet, isExternalSet, isPartSpecSet, waitOnCreateDb,
+ parsedLocation, parsedTableName, parsedDbName, parsedPartSpec, fromTree.getText(),
+ new EximUtil.SemanticAnalyzerWrapperContext(conf, db, inputs, outputs, rootTasks, LOG, ctx));
- // initialize load path
- URI fromURI = EximUtil.getValidatedURI(conf, stripQuotes(fromTree.getText()));
- FileSystem fs = FileSystem.get(fromURI, conf);
- Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
- inputs.add(toReadEntity(fromPath));
-
- EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
- try {
- rv = EximUtil.readMetaData(fs, new Path(fromPath, METADATA_NAME));
- } catch (IOException e) {
- throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
- }
-
- ReplicationSpec replicationSpec = rv.getReplicationSpec();
- if (replicationSpec.isNoop()){
- // nothing to do here, silently return.
- return;
- }
-
- String dbname = SessionState.get().getCurrentDatabase();
- if (isDbNameSet){
- // If the parsed statement contained a db.tablename specification, prefer that.
- dbname = parsedDbName;
- }
-
- // Create table associated with the import
- // Executed if relevant, and used to contain all the other details about the table if not.
- CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
-
- if (isExternalSet){
- tblDesc.setExternal(isExternalSet);
- // This condition-check could have been avoided, but to honour the old
- // default of not calling if it wasn't set, we retain that behaviour.
- // TODO:cleanup after verification that the outer if isn't really needed here
- }
-
- if (isLocationSet){
- tblDesc.setLocation(parsedLocation);
- inputs.add(toReadEntity(parsedLocation));
- }
-
- if (isTableSet){
- tblDesc.setTableName(parsedTableName);
- }
-
- List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
- Iterable<Partition> partitions = rv.getPartitions();
- for (Partition partition : partitions) {
- // TODO: this should ideally not create AddPartitionDesc per partition
- AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
- partitionDescs.add(partsDesc);
- }
-
- if (isPartSpecSet){
- // The import specification asked for only a particular partition to be loaded
- // We load only that, and ignore all the others.
- boolean found = false;
- for (Iterator<AddPartitionDesc> partnIter = partitionDescs
- .listIterator(); partnIter.hasNext();) {
- AddPartitionDesc addPartitionDesc = partnIter.next();
- if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
- found = true;
- } else {
- partnIter.remove();
- }
- }
- if (!found) {
- throw new SemanticException(
- ErrorMsg.INVALID_PARTITION
- .getMsg(" - Specified partition not found in import directory"));
- }
- }
-
- if (tblDesc.getTableName() == null) {
- // Either we got the tablename from the IMPORT statement (first priority)
- // or from the export dump.
- throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
- } else {
- conf.set("import.destination.table", tblDesc.getTableName());
- for (AddPartitionDesc addPartitionDesc : partitionDescs) {
- addPartitionDesc.setTableName(tblDesc.getTableName());
- }
- }
-
- Warehouse wh = new Warehouse(conf);
- Table table = tableIfExists(tblDesc);
-
- if (table != null){
- checkTable(table, tblDesc,replicationSpec);
- LOG.debug("table " + tblDesc.getTableName() + " exists: metadata checked");
- tableExists = true;
- }
-
- if (!replicationSpec.isInReplicationScope()){
- createRegularImportTasks(
- rootTasks, tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, table,
- fromURI, fs, wh);
- } else {
- createReplImportTasks(
- rootTasks, tblDesc, partitionDescs,
- isPartSpecSet, replicationSpec, table,
- fromURI, fs, wh);
- }
} catch (SemanticException e) {
throw e;
} catch (Exception e) {
@@ -265,7 +167,123 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private AddPartitionDesc getBaseAddPartitionDescFromPartition(
+ public static boolean prepareImport(
+ boolean isLocationSet, boolean isExternalSet, boolean isPartSpecSet, boolean waitOnCreateDb,
+ String parsedLocation, String parsedTableName, String parsedDbName,
+ LinkedHashMap<String, String> parsedPartSpec,
+ String fromLocn, EximUtil.SemanticAnalyzerWrapperContext x
+ ) throws IOException, MetaException, HiveException, URISyntaxException {
+
+ // initialize load path
+ URI fromURI = EximUtil.getValidatedURI(x.getConf(), stripQuotes(fromLocn));
+ Path fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+
+ FileSystem fs = FileSystem.get(fromURI, x.getConf());
+ x.getInputs().add(toReadEntity(fromPath, x.getConf()));
+
+ EximUtil.ReadMetaData rv = new EximUtil.ReadMetaData();
+ try {
+ rv = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+ } catch (IOException e) {
+ throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(), e);
+ }
+
+ ReplicationSpec replicationSpec = rv.getReplicationSpec();
+ if (replicationSpec.isNoop()){
+ // nothing to do here, silently return.
+ return false;
+ }
+
+ String dbname = SessionState.get().getCurrentDatabase();
+ if ((parsedDbName !=null) && (!parsedDbName.isEmpty())){
+ // If the parsed statement contained a db.tablename specification, prefer that.
+ dbname = parsedDbName;
+ }
+
+ // Create table associated with the import
+ // Executed if relevant, and used to contain all the other details about the table if not.
+ CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
+
+ if (isExternalSet){
+ tblDesc.setExternal(isExternalSet);
+ // This condition-check could have been avoided, but to honour the old
+ // default of not calling if it wasn't set, we retain that behaviour.
+ // TODO:cleanup after verification that the outer if isn't really needed here
+ }
+
+ if (isLocationSet){
+ tblDesc.setLocation(parsedLocation);
+ x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf()));
+ }
+
+ if ((parsedTableName!= null) && (!parsedTableName.isEmpty())){
+ tblDesc.setTableName(parsedTableName);
+ }
+
+ List<AddPartitionDesc> partitionDescs = new ArrayList<AddPartitionDesc>();
+ Iterable<Partition> partitions = rv.getPartitions();
+ for (Partition partition : partitions) {
+ // TODO: this should ideally not create AddPartitionDesc per partition
+ AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
+ partitionDescs.add(partsDesc);
+ }
+
+ if (isPartSpecSet){
+ // The import specification asked for only a particular partition to be loaded
+ // We load only that, and ignore all the others.
+ boolean found = false;
+ for (Iterator<AddPartitionDesc> partnIter = partitionDescs
+ .listIterator(); partnIter.hasNext();) {
+ AddPartitionDesc addPartitionDesc = partnIter.next();
+ if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) {
+ found = true;
+ } else {
+ partnIter.remove();
+ }
+ }
+ if (!found) {
+ throw new SemanticException(
+ ErrorMsg.INVALID_PARTITION
+ .getMsg(" - Specified partition not found in import directory"));
+ }
+ }
+
+ if (tblDesc.getTableName() == null) {
+ // Either we got the tablename from the IMPORT statement (first priority)
+ // or from the export dump.
+ throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg());
+ } else {
+ x.getConf().set("import.destination.table", tblDesc.getTableName());
+ for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+ addPartitionDesc.setTableName(tblDesc.getTableName());
+ }
+ }
+
+ Warehouse wh = new Warehouse(x.getConf());
+ Table table = tableIfExists(tblDesc, x.getHive());
+ boolean tableExists = false;
+
+ if (table != null){
+ checkTable(table, tblDesc,replicationSpec, x.getConf());
+ x.getLOG().debug("table " + tblDesc.getTableName() + " exists: metadata checked");
+ tableExists = true;
+ }
+
+ if (!replicationSpec.isInReplicationScope()){
+ createRegularImportTasks(
+ tblDesc, partitionDescs,
+ isPartSpecSet, replicationSpec, table,
+ fromURI, fs, wh, x);
+ } else {
+ createReplImportTasks(
+ tblDesc, partitionDescs,
+ isPartSpecSet, replicationSpec, waitOnCreateDb, table,
+ fromURI, fs, wh, x);
+ }
+ return tableExists;
+ }
+
+ private static AddPartitionDesc getBaseAddPartitionDescFromPartition(
Path fromPath, String dbname, CreateTableDesc tblDesc, Partition partition) throws MetaException {
AddPartitionDesc partsDesc = new AddPartitionDesc(dbname, tblDesc.getTableName(),
EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()),
@@ -284,7 +302,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return partsDesc;
}
- private CreateTableDesc getBaseCreateTableDescFromTable(String dbName,
+ private static CreateTableDesc getBaseCreateTableDescFromTable(String dbName,
org.apache.hadoop.hive.metastore.api.Table table) {
if ((table.getPartitionKeys() == null) || (table.getPartitionKeys().size() == 0)){
table.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
@@ -318,94 +336,95 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return tblDesc;
}
- private Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath) {
+ private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path tgtPath,
+ ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x) {
Path dataPath = new Path(fromURI.toString(), "data");
- Path tmpPath = ctx.getExternalTmpPath(tgtPath);
- Task<?> copyTask = TaskFactory.get(new CopyWork(dataPath,
- tmpPath, false), conf);
+ Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
+ Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, x.getConf());
LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
Utilities.getTableDesc(table), new TreeMap<String, String>(),
replace);
- Task<?> loadTableTask = TaskFactory.get(new MoveWork(getInputs(),
- getOutputs(), loadTableWork, null, false), conf);
+ Task<?> loadTableTask = TaskFactory.get(new MoveWork(x.getInputs(),
+ x.getOutputs(), loadTableWork, null, false), x.getConf());
copyTask.addDependentTask(loadTableTask);
- rootTasks.add(copyTask);
+ x.getTasks().add(copyTask);
return loadTableTask;
}
- private Task<?> createTableTask(CreateTableDesc tableDesc){
+ private static Task<?> createTableTask(CreateTableDesc tableDesc, EximUtil.SemanticAnalyzerWrapperContext x){
return TaskFactory.get(new DDLWork(
- getInputs(),
- getOutputs(),
+ x.getInputs(),
+ x.getOutputs(),
tableDesc
- ), conf);
+ ), x.getConf());
}
- private Task<?> dropTableTask(Table table){
+ private static Task<?> dropTableTask(Table table, EximUtil.SemanticAnalyzerWrapperContext x){
return TaskFactory.get(new DDLWork(
- getInputs(),
- getOutputs(),
+ x.getInputs(),
+ x.getOutputs(),
new DropTableDesc(table.getTableName(), null, true, true, null)
- ), conf);
+ ), x.getConf());
}
- private Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc) {
+ private static Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc,
+ EximUtil.SemanticAnalyzerWrapperContext x) {
tableDesc.setReplaceMode(true);
return TaskFactory.get(new DDLWork(
- getInputs(),
- getOutputs(),
+ x.getInputs(),
+ x.getOutputs(),
tableDesc
- ), conf);
+ ), x.getConf());
}
- private Task<? extends Serializable> alterSinglePartition(
+ private static Task<? extends Serializable> alterSinglePartition(
URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
Table table, Warehouse wh, AddPartitionDesc addPartitionDesc,
- ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn) {
+ ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
+ EximUtil.SemanticAnalyzerWrapperContext x) {
addPartitionDesc.setReplaceMode(true);
addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
return TaskFactory.get(new DDLWork(
- getInputs(),
- getOutputs(),
+ x.getInputs(),
+ x.getOutputs(),
addPartitionDesc
- ), conf);
+ ), x.getConf());
}
-
- private Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
+ private static Task<?> addSinglePartition(URI fromURI, FileSystem fs, CreateTableDesc tblDesc,
Table table, Warehouse wh,
- AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec)
+ AddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x)
throws MetaException, IOException, HiveException {
AddPartitionDesc.OnePartitionDesc partSpec = addPartitionDesc.getPartition(0);
if (tblDesc.isExternal() && tblDesc.getLocation() == null) {
- LOG.debug("Importing in-place: adding AddPart for partition "
+ x.getLOG().debug("Importing in-place: adding AddPart for partition "
+ partSpecToString(partSpec.getPartSpec()));
// addPartitionDesc already has the right partition location
- Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(),
- getOutputs(), addPartitionDesc), conf);
+ Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+ x.getOutputs(), addPartitionDesc), x.getConf());
return addPartTask;
} else {
String srcLocation = partSpec.getLocation();
- fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec);
- LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ fixLocationInPartSpec(fs, tblDesc, table, wh, replicationSpec, partSpec, x);
+ x.getLOG().debug("adding dependent CopyWork/AddPart/MoveWork for partition "
+ partSpecToString(partSpec.getPartSpec())
+ " with source location: " + srcLocation);
Path tgtLocation = new Path(partSpec.getLocation());
- Path tmpPath = ctx.getExternalTmpPath(tgtLocation);
- Task<?> copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation),
- tmpPath, false), conf);
- Task<?> addPartTask = TaskFactory.get(new DDLWork(getInputs(),
- getOutputs(), addPartitionDesc), conf);
+ Path tmpPath = x.getCtx().getExternalTmpPath(tgtLocation);
+ Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+ replicationSpec, new Path(srcLocation), tmpPath, x.getConf());
+ Task<?> addPartTask = TaskFactory.get(new DDLWork(x.getInputs(),
+ x.getOutputs(), addPartitionDesc), x.getConf());
LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
Utilities.getTableDesc(table),
partSpec.getPartSpec(), true);
loadTableWork.setInheritTableSpecs(false);
Task<?> loadPartTask = TaskFactory.get(new MoveWork(
- getInputs(), getOutputs(), loadTableWork, null, false),
- conf);
+ x.getInputs(), x.getOutputs(), loadTableWork, null, false),
+ x.getConf());
copyTask.addDependentTask(loadPartTask);
addPartTask.addDependentTask(loadPartTask);
- rootTasks.add(copyTask);
+ x.getTasks().add(copyTask);
return addPartTask;
}
}
@@ -413,17 +432,18 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
/**
* Helper method to set location properly in partSpec
*/
- private void fixLocationInPartSpec(
+ private static void fixLocationInPartSpec(
FileSystem fs, CreateTableDesc tblDesc, Table table,
Warehouse wh, ReplicationSpec replicationSpec,
- AddPartitionDesc.OnePartitionDesc partSpec) throws MetaException, HiveException, IOException {
+ AddPartitionDesc.OnePartitionDesc partSpec,
+ EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, HiveException, IOException {
Path tgtPath = null;
if (tblDesc.getLocation() == null) {
if (table.getDataLocation() != null) {
tgtPath = new Path(table.getDataLocation().toString(),
Warehouse.makePartPath(partSpec.getPartSpec()));
} else {
- Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+ Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
tgtPath = new Path(
wh.getTablePath( parentDb, tblDesc.getTableName()),
Warehouse.makePartPath(partSpec.getPartSpec()));
@@ -432,22 +452,23 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
tgtPath = new Path(tblDesc.getLocation(),
Warehouse.makePartPath(partSpec.getPartSpec()));
}
- FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf);
- checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec);
+ FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
+ checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
partSpec.setLocation(tgtPath.toString());
}
- private void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec)
+ private static void checkTargetLocationEmpty(FileSystem fs, Path targetPath, ReplicationSpec replicationSpec,
+ EximUtil.SemanticAnalyzerWrapperContext x)
throws IOException, SemanticException {
if (replicationSpec.isInReplicationScope()){
// replication scope allows replacement, and does not require empty directories
return;
}
- LOG.debug("checking emptiness of " + targetPath.toString());
+ x.getLOG().debug("checking emptiness of " + targetPath.toString());
if (fs.exists(targetPath)) {
FileStatus[] status = fs.listStatus(targetPath, FileUtils.HIDDEN_FILES_PATH_FILTER);
if (status.length > 0) {
- LOG.debug("Files inc. " + status[0].getPath().toString()
+ x.getLOG().debug("Files inc. " + status[0].getPath().toString()
+ " found in path : " + targetPath.toString());
throw new SemanticException(ErrorMsg.TABLE_DATA_EXISTS.getMsg());
}
@@ -469,7 +490,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
return sb.toString();
}
- private void checkTable(Table table, CreateTableDesc tableDesc, ReplicationSpec replicationSpec)
+ private static void checkTable(Table table, CreateTableDesc tableDesc, ReplicationSpec replicationSpec, HiveConf conf)
throws SemanticException, URISyntaxException {
// This method gets called only in the scope that a destination table already exists, so
// we're validating if the table is an appropriate destination to import into
@@ -681,25 +702,33 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
/**
* Create tasks for regular import, no repl complexity
+ * @param tblDesc
+ * @param partitionDescs
+ * @param isPartSpecSet
+ * @param replicationSpec
+ * @param table
+ * @param fromURI
+ * @param fs
+ * @param wh
*/
- private void createRegularImportTasks(
- List<Task<? extends Serializable>> rootTasks,
+ private static void createRegularImportTasks(
CreateTableDesc tblDesc,
List<AddPartitionDesc> partitionDescs,
boolean isPartSpecSet,
ReplicationSpec replicationSpec,
- Table table, URI fromURI, FileSystem fs, Warehouse wh)
+ Table table, URI fromURI, FileSystem fs, Warehouse wh, EximUtil.SemanticAnalyzerWrapperContext x)
throws HiveException, URISyntaxException, IOException, MetaException {
if (table != null){
if (table.isPartitioned()) {
- LOG.debug("table partitioned");
+ x.getLOG().debug("table partitioned");
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
- if ((ptn = db.getPartition(table, partSpec, false)) == null) {
- rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+ if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
+ x.getTasks().add(addSinglePartition(
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
} else {
throw new SemanticException(
ErrorMsg.PARTITION_EXISTS.getMsg(partSpecToString(partSpec)));
@@ -707,35 +736,35 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
} else {
- LOG.debug("table non-partitioned");
+ x.getLOG().debug("table non-partitioned");
// ensure if destination is not empty only for regular import
Path tgtPath = new Path(table.getDataLocation().toString());
- FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), conf);
- checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec);
- loadTable(fromURI, table, false, tgtPath);
+ FileSystem tgtFs = FileSystem.get(tgtPath.toUri(), x.getConf());
+ checkTargetLocationEmpty(tgtFs, tgtPath, replicationSpec, x);
+ loadTable(fromURI, table, false, tgtPath, replicationSpec,x);
}
// Set this to read because we can't overwrite any existing partitions
- outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
+ x.getOutputs().add(new WriteEntity(table, WriteEntity.WriteType.DDL_NO_LOCK));
} else {
- LOG.debug("table " + tblDesc.getTableName() + " does not exist");
+ x.getLOG().debug("table " + tblDesc.getTableName() + " does not exist");
- Task<?> t = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), tblDesc), conf);
+ Task<?> t = TaskFactory.get(new DDLWork(x.getInputs(), x.getOutputs(), tblDesc), x.getConf());
table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
- Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+ Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
// Since we are going to be creating a new table in a db, we should mark that db as a write entity
// so that the auth framework can go to work there.
- outputs.add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
+ x.getOutputs().add(new WriteEntity(parentDb, WriteEntity.WriteType.DDL_SHARED));
if (isPartitioned(tblDesc)) {
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+ addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
}
} else {
- LOG.debug("adding dependent CopyWork/MoveWork for table");
+ x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) {
- LOG.debug("Importing in place, no emptiness check, no copying/loading");
+ x.getLOG().debug("Importing in place, no emptiness check, no copying/loading");
Path dataPath = new Path(fromURI.toString(), "data");
tblDesc.setLocation(dataPath.toString());
} else {
@@ -745,23 +774,24 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
tablePath = wh.getTablePath(parentDb, tblDesc.getTableName());
}
- FileSystem tgtFs = FileSystem.get(tablePath.toUri(), conf);
- checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec);
- t.addDependentTask(loadTable(fromURI, table, false, tablePath));
+ FileSystem tgtFs = FileSystem.get(tablePath.toUri(), x.getConf());
+ checkTargetLocationEmpty(tgtFs, tablePath, replicationSpec,x);
+ t.addDependentTask(loadTable(fromURI, table, false, tablePath, replicationSpec, x));
}
}
- rootTasks.add(t);
+ x.getTasks().add(t);
}
}
/**
* Create tasks for repl import
*/
- private void createReplImportTasks(
- List<Task<? extends Serializable>> rootTasks,
+ private static void createReplImportTasks(
CreateTableDesc tblDesc,
List<AddPartitionDesc> partitionDescs,
- boolean isPartSpecSet, ReplicationSpec replicationSpec, Table table, URI fromURI, FileSystem fs, Warehouse wh)
+ boolean isPartSpecSet, ReplicationSpec replicationSpec, boolean waitOnCreateDb,
+ Table table, URI fromURI, FileSystem fs, Warehouse wh,
+ EximUtil.SemanticAnalyzerWrapperContext x)
throws HiveException, URISyntaxException, IOException, MetaException {
Task dr = null;
@@ -774,7 +804,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// So, we check the repl.last.id of the destination, and if it's newer, we no-op. If it's older, we
// drop and re-create.
if (replicationSpec.allowReplacementInto(table)){
- dr = dropTableTask(table);
+ dr = dropTableTask(table, x);
lockType = WriteEntity.WriteType.DDL_EXCLUSIVE;
table = null; // null it out so we go into the table re-create flow.
} else {
@@ -782,12 +812,29 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- Database parentDb = db.getDatabase(tblDesc.getDatabaseName());
+ // Normally, on import, trying to create a table or a partition in a db that does not yet exist
+ // is a error condition. However, in the case of a REPL LOAD, it is possible that we are trying
+ // to create tasks to create a table inside a db that as-of-now does not exist, but there is
+ // a precursor Task waiting that will create it before this is encountered. Thus, we instantiate
+ // defaults and do not error out in that case.
+ Database parentDb = x.getHive().getDatabase(tblDesc.getDatabaseName());
if (parentDb == null){
- throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName()));
+ if (!waitOnCreateDb){
+ throw new SemanticException(ErrorMsg.DATABASE_NOT_EXISTS.getMsg(tblDesc.getDatabaseName()));
+ }
}
if (tblDesc.getLocation() == null) {
- tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString());
+ if (!waitOnCreateDb){
+ tblDesc.setLocation(wh.getTablePath(parentDb, tblDesc.getTableName()).toString());
+ } else {
+ tblDesc.setLocation(
+ wh.getDnsPath(new Path(
+ wh.getDefaultDatabasePath(tblDesc.getDatabaseName()),
+ MetaStoreUtils.encodeTableName(tblDesc.getTableName().toLowerCase())
+ )
+ ).toString());
+
+ }
}
/* Note: In the following section, Metadata-only import handling logic is
@@ -807,49 +854,52 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
lockType = WriteEntity.WriteType.DDL_SHARED;
}
- Task t = createTableTask(tblDesc);
+ Task t = createTableTask(tblDesc, x);
table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
if (!replicationSpec.isMetadataOnly()) {
if (isPartitioned(tblDesc)) {
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
t.addDependentTask(
- addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+ addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
}
} else {
- LOG.debug("adding dependent CopyWork/MoveWork for table");
- t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation())));
+ x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
+ t.addDependentTask(loadTable(fromURI, table, true, new Path(tblDesc.getLocation()),replicationSpec, x));
}
}
if (dr == null){
// Simply create
- rootTasks.add(t);
+ x.getTasks().add(t);
} else {
// Drop and recreate
dr.addDependentTask(t);
- rootTasks.add(dr);
+ x.getTasks().add(dr);
}
} else {
// Table existed, and is okay to replicate into, not dropping and re-creating.
if (table.isPartitioned()) {
- LOG.debug("table partitioned");
+ x.getLOG().debug("table partitioned");
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
- if ((ptn = db.getPartition(table, partSpec, false)) == null) {
+ if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) {
if (!replicationSpec.isMetadataOnly()){
- rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+ x.getTasks().add(addSinglePartition(
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
}
} else {
// If replicating, then the partition already existing means we need to replace, maybe, if
// the destination ptn's repl.last.id is older than the replacement's.
if (replicationSpec.allowReplacementInto(ptn)){
if (!replicationSpec.isMetadataOnly()){
- rootTasks.add(addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec));
+ x.getTasks().add(addSinglePartition(
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
} else {
- rootTasks.add(alterSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn));
+ x.getTasks().add(alterSinglePartition(
+ fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
@@ -862,31 +912,31 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){
// MD-ONLY table alter
- rootTasks.add(alterTableTask(tblDesc));
+ x.getTasks().add(alterTableTask(tblDesc, x));
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
}
}
} else {
- LOG.debug("table non-partitioned");
+ x.getLOG().debug("table non-partitioned");
if (!replicationSpec.allowReplacementInto(table)){
return; // silently return, table is newer than our replacement.
}
if (!replicationSpec.isMetadataOnly()) {
- loadTable(fromURI, table, true, new Path(fromURI)); // repl-imports are replace-into
+ loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into
} else {
- rootTasks.add(alterTableTask(tblDesc));
+ x.getTasks().add(alterTableTask(tblDesc, x));
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
}
}
}
- outputs.add(new WriteEntity(table,lockType));
+ x.getOutputs().add(new WriteEntity(table,lockType));
}
- private boolean isPartitioned(CreateTableDesc tblDesc) {
+ private static boolean isPartitioned(CreateTableDesc tblDesc) {
return !(tblDesc.getPartCols() == null || tblDesc.getPartCols().isEmpty());
}
@@ -894,7 +944,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
* Utility method that returns a table if one corresponding to the destination
* tblDesc is found. Returns null if no such table is found.
*/
- private Table tableIfExists(CreateTableDesc tblDesc) throws HiveException {
+ private static Table tableIfExists(CreateTableDesc tblDesc, Hive db) throws HiveException {
try {
return db.getTable(tblDesc.getDatabaseName(),tblDesc.getTableName());
} catch (InvalidTableException e) {
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index a7005f1..6726d44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -69,7 +69,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
@Override
public boolean accept(Path p) {
String name = p.getName();
- return name.equals("_metadata") ? true : !name.startsWith("_") && !name.startsWith(".");
+ return name.equals(EximUtil.METADATA_NAME) ? true : !name.startsWith("_") && !name.startsWith(".");
}
});
if ((srcs != null) && srcs.length == 1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/739ac3af/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
index a17696a..db624c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MetaDataExportListener.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.metastore.events.PreDropTableEvent;
import org.apache.hadoop.hive.metastore.events.PreEventContext;
import org.apache.hadoop.hive.metastore.events.PreEventContext.PreEventType;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
+
/**
* This class listens for drop events and, if set, exports the table's metadata as JSON to the trash
* of the user performing the drop
@@ -83,7 +83,7 @@ public class MetaDataExportListener extends MetaStorePreEventListener {
} catch (IOException e) {
throw new MetaException(e.getMessage());
}
- Path outFile = new Path(metaPath, name + ImportSemanticAnalyzer.METADATA_NAME);
+ Path outFile = new Path(metaPath, name + EximUtil.METADATA_NAME);
try {
SessionState.getConsole().printInfo("Beginning metadata export");
EximUtil.createExportDump(fs, outFile, mTbl, null, null);