You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2016/12/06 19:21:47 UTC
hive git commit: HIVE-15332: REPL LOAD & DUMP support for incremental
CREATE_TABLE/ADD_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)
Repository: hive
Updated Branches:
refs/heads/master 0c8edf053 -> 2f5889c9b
HIVE-15332: REPL LOAD & DUMP support for incremental CREATE_TABLE/ADD_PTN (Sushanth Sowmyan reviewed by Vaibhav Gumashta)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f5889c9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f5889c9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f5889c9
Branch: refs/heads/master
Commit: 2f5889c9b94977b064ba614c89684404cbb9ca63
Parents: 0c8edf0
Author: Vaibhav Gumashta <vg...@hortonworks.com>
Authored: Tue Dec 6 11:16:12 2016 -0800
Committer: Vaibhav Gumashta <vg...@hortonworks.com>
Committed: Tue Dec 6 11:16:12 2016 -0800
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 1 +
itests/hive-unit/pom.xml | 5 +
.../hive/ql/TestReplicationScenarios.java | 129 +++++++-
.../hive/metastore/messaging/EventUtils.java | 22 ++
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 16 +
.../apache/hadoop/hive/ql/metadata/Hive.java | 28 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 19 +-
.../ql/parse/ReplicationSemanticAnalyzer.java | 320 +++++++++++++++----
.../hadoop/hive/ql/parse/ReplicationSpec.java | 8 +
.../hadoop/hive/ql/plan/AddPartitionDesc.java | 22 ++
.../hadoop/hive/ql/plan/CreateTableDesc.java | 21 ++
11 files changed, 516 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 494d01f..119801f 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -332,6 +332,7 @@ public class DbNotificationListener extends MetaStoreEventListener {
private void enqueue(NotificationEvent event) {
if (rs != null) {
synchronized(NOTIFICATION_TBL_LOCK) {
+ LOG.debug("DbNotif:Enqueueing : {}:{}",event.getEventId(),event.getMessage());
rs.addNotificationEvent(event);
}
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/itests/hive-unit/pom.xml
----------------------------------------------------------------------
diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml
index cd209b4..6a190d1 100644
--- a/itests/hive-unit/pom.xml
+++ b/itests/hive-unit/pom.xml
@@ -81,6 +81,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-server-extensions</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-it-util</artifactId>
<version>${project.version}</version>
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 01abe9b..95db9e8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.Shell;
@@ -55,6 +56,7 @@ public class TestReplicationScenarios {
static Driver driver;
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
+ private ArrayList<String> lastResults;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -69,8 +71,8 @@ public class TestReplicationScenarios {
WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf);
}
-// System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
-// DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
+ System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
+ DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
msPort = MetaStoreUtils.startMetaStore();
hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
@@ -106,6 +108,12 @@ public class TestReplicationScenarios {
// after each test
}
+ private static int next = 0;
+ private synchronized void advanceDumpDir() {
+ next++;
+ ReplicationSemanticAnalyzer.injectNextDumpDirForTest(String.valueOf(next));
+ }
+
/**
* Tests basic operation - creates a db, with 4 tables, 2 ptned and 2 unptned.
* Inserts data into one of the ptned tables, and one of the unptned tables,
@@ -152,7 +160,7 @@ public class TestReplicationScenarios {
run("SELECT * from " + dbName + ".unptned_empty");
verifyResults(empty);
-
+ advanceDumpDir();
run("REPL DUMP " + dbName);
String replDumpLocn = getResult(0,0);
run("REPL LOAD " + dbName + "_dupe FROM '"+replDumpLocn+"'");
@@ -169,15 +177,116 @@ public class TestReplicationScenarios {
verifyResults(empty);
}
+ @Test
+ public void testIncrementalAdds() throws IOException {
+ String testName = "incrementalAdds";
+ LOG.info("Testing "+testName);
+ String dbName = testName + "_" + tid;
+
+ run("CREATE DATABASE " + dbName);
+
+ run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+ run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName);
+ String replDumpLocn = getResult(0,0);
+ String replDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}",replDumpLocn,replDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+ String[] unptn_data = new String[]{ "eleven" , "twelve" };
+ String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+ String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+ String[] empty = new String[]{};
+
+ String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+ String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+ String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+ createTestDataFile(unptn_locn, unptn_data);
+ createTestDataFile(ptn_locn_1, ptn_data_1);
+ createTestDataFile(ptn_locn_2, ptn_data_2);
+
+ run("SELECT a from " + dbName + ".ptned_empty");
+ verifyResults(empty);
+ run("SELECT * from " + dbName + ".unptned_empty");
+ verifyResults(empty);
+
+ run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned");
+ verifyResults(unptn_data);
+ run("CREATE TABLE " + dbName + ".unptned_late AS SELECT * from " + dbName + ".unptned");
+ run("SELECT * from " + dbName + ".unptned_late");
+ verifyResults(unptn_data);
+
+
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)");
+ run("SELECT a from " + dbName + ".ptned WHERE b=2");
+ verifyResults(ptn_data_2);
+
+ // verified up to here.
+ run("CREATE TABLE " + dbName + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=1)");
+ run("SELECT a from " + dbName + ".ptned_late WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned_late PARTITION(b=2)");
+ run("SELECT a from " + dbName + ".ptned_late WHERE b=2");
+ verifyResults(ptn_data_2);
+
+ advanceDumpDir();
+ run("REPL DUMP " + dbName + " FROM " + replDumpId );
+ String incrementalDumpLocn = getResult(0,0);
+ String incrementalDumpId = getResult(0,1,true);
+ LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+ run("REPL LOAD " + dbName + "_dupe FROM '"+incrementalDumpLocn+"'");
+
+ run("SELECT * from " + dbName + "_dupe.unptned_empty");
+ verifyResults(empty);
+ run("SELECT a from " + dbName + ".ptned_empty");
+ verifyResults(empty);
+
+
+// this does not work because LOAD DATA LOCAL INPATH into an unptned table seems
+// to use ALTER_TABLE only - it does not emit an INSERT or CREATE - re-enable after
+// fixing that.
+// run("SELECT * from " + dbName + "_dupe.unptned");
+// verifyResults(unptn_data);
+ run("SELECT * from " + dbName + "_dupe.unptned_late");
+ verifyResults(unptn_data);
+
+
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+ verifyResults(ptn_data_2);
+
+ // verified up to here.
+ run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1");
+ verifyResults(ptn_data_1);
+ run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2");
+ verifyResults(ptn_data_2);
+ }
+
private String getResult(int rowNum, int colNum) throws IOException {
- List<String> results = new ArrayList<String>();
- try {
- driver.getResults(results);
- } catch (CommandNeedRetryException e) {
- e.printStackTrace();
- throw new RuntimeException(e);
+ return getResult(rowNum,colNum,false);
+ }
+ private String getResult(int rowNum, int colNum, boolean reuse) throws IOException {
+ if (!reuse) {
+ lastResults = new ArrayList<String>();
+ try {
+ driver.getResults(lastResults);
+ } catch (CommandNeedRetryException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
}
- return (results.get(rowNum).split("\\001"))[colNum];
+ return (lastResults.get(rowNum).split("\\001"))[colNum];
}
private void verifyResults(String[] data) throws IOException {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
index 932af7e..927bf15 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/EventUtils.java
@@ -63,6 +63,28 @@ public class EventUtils {
};
}
+ public static IMetaStoreClient.NotificationFilter getEventBoundaryFilter(final Long eventFrom, final Long eventTo){
+ return new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ if ( (event == null) || (event.getEventId() < eventFrom) || (event.getEventId() > eventTo)) {
+ return false;
+ }
+ return true;
+ }
+ };
+ }
+
+ public static IMetaStoreClient.NotificationFilter andFilter(
+ final IMetaStoreClient.NotificationFilter filter1,
+ final IMetaStoreClient.NotificationFilter filter2) {
+ return new IMetaStoreClient.NotificationFilter() {
+ @Override
+ public boolean accept(NotificationEvent event) {
+ return filter1.accept(event) && filter2.accept(event);
+ }
+ };
+ }
public interface NotificationFetcher {
public int getBatchSize() throws IOException;
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 0ac9053..4b39eb9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -4084,6 +4084,22 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
LOG.info("creating table " + tbl.getDbName() + "." + tbl.getTableName() + " on " +
tbl.getDataLocation());
+ if (crtTbl.getReplicationSpec().isInReplicationScope() && (!crtTbl.getReplaceMode())){
+ // if this is a replication spec, then replace-mode semantics might apply.
+ // if we're already asking for a table replacement, then we can skip this check.
+ // however, otherwise, if in replication scope, and we've not been explicitly asked
+ // to replace, we should check if the object we're looking at exists, and if so,
+ // trigger replace-mode semantics.
+ Table existingTable = db.getTable(tbl.getDbName(), tbl.getTableName(), false);
+ if (existingTable != null){
+ if (!crtTbl.getReplicationSpec().allowEventReplacementInto(existingTable)){
+ return 0; // no replacement, the existing table state is newer than our update.
+ } else {
+ crtTbl.setReplaceMode(true); // we replace existing table.
+ }
+ }
+ }
+
// create the table
if (crtTbl.getReplaceMode()){
// replace-mode creates are really alters using CreateTableDesc.
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 8f230fc..e477f24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2016,18 +2016,40 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
List<Partition> out = new ArrayList<Partition>();
try {
- if (!addPartitionDesc.getReplaceMode()){
+ if (!addPartitionDesc.getReplicationSpec().isInReplicationScope()){
// TODO: normally, the result is not necessary; might make sense to pass false
for (org.apache.hadoop.hive.metastore.api.Partition outPart
: getMSC().add_partitions(in, addPartitionDesc.isIfNotExists(), true)) {
out.add(new Partition(tbl, outPart));
}
} else {
- getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), in, null);
- List<String> part_names = new ArrayList<String>();
+
+ // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario.
+ // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have
+ // no choice but to iterate over the partitions here.
+
+ List<org.apache.hadoop.hive.metastore.api.Partition> partsToAdd = new ArrayList<>();
+ List<org.apache.hadoop.hive.metastore.api.Partition> partsToAlter = new ArrayList<>();
+ List<String> part_names = new ArrayList<>();
for (org.apache.hadoop.hive.metastore.api.Partition p: in){
part_names.add(Warehouse.makePartName(tbl.getPartitionKeys(), p.getValues()));
+ try {
+ org.apache.hadoop.hive.metastore.api.Partition ptn =
+ getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues());
+ if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn)){
+ partsToAlter.add(p);
+ } // else ptn already exists, but we do nothing with it.
+ } catch (NoSuchObjectException nsoe){
+ // if the object does not exist, we want to add it.
+ partsToAdd.add(p);
+ }
}
+ for (org.apache.hadoop.hive.metastore.api.Partition outPart
+ : getMSC().add_partitions(partsToAdd, addPartitionDesc.isIfNotExists(), true)) {
+ out.add(new Partition(tbl, outPart));
+ }
+ getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), partsToAlter, null);
+
for ( org.apache.hadoop.hive.metastore.api.Partition outPart :
getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){
out.add(new Partition(tbl,outPart));
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 3420efd..ce952c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -204,6 +204,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
// Executed if relevant, and used to contain all the other details about the table if not.
CreateTableDesc tblDesc = getBaseCreateTableDescFromTable(dbname,rv.getTable());
+ if ((replicationSpec!= null) && replicationSpec.isInReplicationScope()){
+ tblDesc.setReplicationSpec(replicationSpec);
+ }
+
if (isExternalSet){
tblDesc.setExternal(isExternalSet);
// This condition-check could have been avoided, but to honour the old
@@ -368,8 +372,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
private static Task<? extends Serializable> alterTableTask(CreateTableDesc tableDesc,
- EximUtil.SemanticAnalyzerWrapperContext x) {
+ EximUtil.SemanticAnalyzerWrapperContext x, ReplicationSpec replicationSpec) {
tableDesc.setReplaceMode(true);
+ if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){
+ tableDesc.setReplicationSpec(replicationSpec);
+ }
return TaskFactory.get(new DDLWork(
x.getInputs(),
x.getOutputs(),
@@ -383,6 +390,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn,
EximUtil.SemanticAnalyzerWrapperContext x) {
addPartitionDesc.setReplaceMode(true);
+ if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())){
+ addPartitionDesc.setReplicationSpec(replicationSpec);
+ }
addPartitionDesc.getPartition(0).setLocation(ptn.getLocation()); // use existing location
return TaskFactory.get(new DDLWork(
x.getInputs(),
@@ -860,6 +870,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (!replicationSpec.isMetadataOnly()) {
if (isPartitioned(tblDesc)) {
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
+ addPartitionDesc.setReplicationSpec(replicationSpec);
t.addDependentTask(
addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x));
}
@@ -881,7 +892,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (table.isPartitioned()) {
x.getLOG().debug("table partitioned");
for (AddPartitionDesc addPartitionDesc : partitionDescs) {
-
+ addPartitionDesc.setReplicationSpec(replicationSpec);
Map<String, String> partSpec = addPartitionDesc.getPartition(0).getPartSpec();
org.apache.hadoop.hive.ql.metadata.Partition ptn = null;
@@ -912,7 +923,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
}
if (replicationSpec.isMetadataOnly() && partitionDescs.isEmpty()){
// MD-ONLY table alter
- x.getTasks().add(alterTableTask(tblDesc, x));
+ x.getTasks().add(alterTableTask(tblDesc, x,replicationSpec));
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
}
@@ -925,7 +936,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
if (!replicationSpec.isMetadataOnly()) {
loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into
} else {
- x.getTasks().add(alterTableTask(tblDesc, x));
+ x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 938355e..8007c4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hive.ql.parse;
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -25,25 +28,26 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
-import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
-import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
import org.apache.hadoop.hive.metastore.messaging.EventUtils;
-import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
+import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.io.IOUtils;
+import javax.annotation.Nullable;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -63,12 +67,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private String dbNameOrPattern;
// Table name or pattern
private String tblNameOrPattern;
- private Integer eventFrom;
- private Integer eventTo;
+ private Long eventFrom;
+ private Long eventTo;
private Integer batchSize;
// Base path for REPL LOAD
private String path;
+ private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
+
public ReplicationSemanticAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
@@ -114,13 +120,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
} else {
// TOK_FROM subtree
Tree fromNode = ast.getChild(currNode);
- eventFrom = Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
+ eventFrom = Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(0).getText()));
// skip the first, which is always required
int numChild = 1;
while (numChild < fromNode.getChildCount()) {
if (fromNode.getChild(numChild).getType() == TOK_TO) {
eventTo =
- Integer.parseInt(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
+ Long.parseLong(PlanUtils.stripQuotes(fromNode.getChild(numChild + 1).getText()));
// skip the next child, since we already took care of it
numChild++;
} else if (fromNode.getChild(numChild).getType() == TOK_BATCH) {
@@ -142,38 +148,212 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// REPL DUMP
private void analyzeReplDump(ASTNode ast) throws SemanticException {
- // FIXME: support non-bootstrap: use eventFrom/eventTo/batchSize
LOG.debug("ReplicationSemanticAnalyzer.analyzeReplDump: " + String.valueOf(dbNameOrPattern)
+ "." + String.valueOf(tblNameOrPattern) + " from " + String.valueOf(eventFrom) + " to "
+ String.valueOf(eventTo) + " batchsize " + String.valueOf(batchSize));
String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
Path dumpRoot = new Path(replRoot, getNextDumpDir());
+ Path dumpMetadata = new Path(dumpRoot,"_dumpmetadata");
try {
- for (String dbName : matchesDb(dbNameOrPattern)) {
- LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
- Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
- for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
- LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
- + " to db root " + dbRoot.toUri());
- dumpTbl(ast, dbName, tblName, dbRoot);
+ if (eventFrom == null){
+ // bootstrap case
+ String bootDumpBeginReplId =
+ String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+ for (String dbName : matchesDb(dbNameOrPattern)) {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
+ Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+ for (String tblName : matchesTbl(dbName, tblNameOrPattern)) {
+ LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping table: " + tblName
+ + " to db root " + dbRoot.toUri());
+ dumpTbl(ast, dbName, tblName, dbRoot);
+ }
+ }
+ String bootDumpEndReplId =
+ String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
+ LOG.info("Bootstrap object dump phase took from {} to {}",bootDumpBeginReplId, bootDumpEndReplId);
+
+ // Now that bootstrap has dumped all objects related, we have to account for the changes
+ // that occurred while bootstrap was happening - i.e. we have to look through all events
+ // during the bootstrap period and consolidate them with our dump.
+
+ IMetaStoreClient.NotificationFilter evFilter =
+ EventUtils.getDbTblNotificationFilter(dbNameOrPattern, tblNameOrPattern);
+ EventUtils.MSClientNotificationFetcher evFetcher =
+ new EventUtils.MSClientNotificationFetcher(db.getMSC());
+ EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+ evFetcher, Long.valueOf(bootDumpBeginReplId),
+ Ints.checkedCast(Long.valueOf(bootDumpEndReplId) - Long.valueOf(bootDumpBeginReplId) + 1),
+ evFilter );
+
+ // Now we consolidate all the events that happenned during the objdump into the objdump
+ while (evIter.hasNext()){
+ NotificationEvent ev = evIter.next();
+ Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
+ // FIXME : implement consolidateEvent(..) similar to dumpEvent(ev,evRoot)
}
+ LOG.info("Consolidation done, preparing to return {},{}",dumpRoot.toUri(),bootDumpEndReplId);
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), bootDumpEndReplId),
+ "dump_dir,last_repl_id#string,string");
+ } else {
+ // get list of events matching dbPattern & tblPattern
+ // go through each event, and dump out each event to a event-level dump dir inside dumproot
+ if (eventTo == null){
+ eventTo = db.getMSC().getCurrentNotificationEventId().getEventId();
+ LOG.debug("eventTo not specified, using current event id : {}", eventTo);
+ }
+
+ Integer maxRange = Ints.checkedCast(eventTo - eventFrom + 1);
+ batchSize = 15;
+ if (batchSize == null){
+ batchSize = maxRange;
+ } else {
+ if (batchSize > maxRange){
+ batchSize = maxRange;
+ }
+ }
+
+ IMetaStoreClient.NotificationFilter evFilter = EventUtils.andFilter(
+ EventUtils.getDbTblNotificationFilter(dbNameOrPattern,tblNameOrPattern),
+ EventUtils.getEventBoundaryFilter(eventFrom, eventTo));
+
+ EventUtils.MSClientNotificationFetcher evFetcher
+ = new EventUtils.MSClientNotificationFetcher(db.getMSC());
+
+ EventUtils.NotificationEventIterator evIter = new EventUtils.NotificationEventIterator(
+ evFetcher, eventFrom, batchSize, evFilter);
+
+ while (evIter.hasNext()){
+ NotificationEvent ev = evIter.next();
+ Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
+ dumpEvent(ev,evRoot);
+ }
+
+ LOG.info("Done dumping events, preparing to return {},{}",dumpRoot.toUri(),eventTo);
+ List<String> vals;
+ writeOutput(Arrays.asList("event", String.valueOf(eventFrom), String.valueOf(eventTo)), dumpMetadata);
+ prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), String.valueOf(eventTo)),
+ "dump_dir,last_repl_id#string,string");
}
- String currentReplId =
- String.valueOf(db.getMSC().getCurrentNotificationEventId().getEventId());
- prepareReturnValues(Arrays.asList(dumpRoot.toUri().toString(), currentReplId),
- "dump_dir,last_repl_id#string,string");
} catch (Exception e) {
// TODO : simple wrap & rethrow for now, clean up with error codes
+ LOG.warn("Error during analyzeReplDump",e);
throw new SemanticException(e);
}
}
+ private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception {
+ long evid = ev.getEventId();
+ String evidStr = String.valueOf(evid);
+ ReplicationSpec replicationSpec = getNewReplicationSpec(evidStr, evidStr);
+ MessageDeserializer md = MessageFactory.getInstance().getDeserializer();
+ switch (ev.getEventType()){
+ case MessageFactory.CREATE_TABLE_EVENT : {
+ LOG.info("Processing#{} CREATE_TABLE message : {}",ev.getEventId(),ev.getMessage());
+
+ // FIXME : Current MessageFactory api is lacking,
+ // and impl is in JSONMessageFactory instead. This needs to be
+ // refactored correctly so we don't depend on a specific impl.
+ org.apache.hadoop.hive.metastore.api.Table tobj =
+ JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev));
+ if (tobj == null){
+ LOG.debug("Event#{} was a CREATE_TABLE_EVENT with no table listed");
+ break;
+ }
+
+ Table qlMdTable = new Table(tobj);
+
+ Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
+ EximUtil.createExportDump(
+ metaDataPath.getFileSystem(conf),
+ metaDataPath,
+ qlMdTable,
+ null,
+ replicationSpec);
+
+ // FIXME : dump _files should happen at dbnotif time, doing it here is incorrect
+ // we will, however, do so here, now, for dev/debug's sake.
+ Path dataPath = new Path(evRoot,"data");
+ rootTasks.add(ReplCopyTask.getDumpCopyTask(replicationSpec, qlMdTable.getPath(), dataPath , conf));
+
+ break;
+ }
+ case MessageFactory.ADD_PARTITION_EVENT : {
+ LOG.info("Processing#{} ADD_PARTITION message : {}",ev.getEventId(),ev.getMessage());
+ // FIXME : Current MessageFactory api is lacking,
+ // and impl is in JSONMessageFactory instead. This needs to be
+ // refactored correctly so we don't depend on a specific impl.
+ List<org.apache.hadoop.hive.metastore.api.Partition> ptnObjs =
+ JSONMessageFactory.getPartitionObjList(JSONMessageFactory.getJsonTree(ev));
+ if ((ptnObjs == null) || (ptnObjs.size() == 0)) {
+ LOG.debug("Event#{} was an ADD_PTN_EVENT with no partitions");
+ break;
+ }
+ org.apache.hadoop.hive.metastore.api.Table tobj =
+ JSONMessageFactory.getTableObj(JSONMessageFactory.getJsonTree(ev));
+ if (tobj == null){
+ LOG.debug("Event#{} was a ADD_PTN_EVENT with no table listed");
+ break;
+ }
+
+ final Table qlMdTable = new Table(tobj);
+ List<Partition> qlPtns = Lists.transform(
+ ptnObjs,
+ new Function<org.apache.hadoop.hive.metastore.api.Partition, Partition>() {
+ @Nullable
+ @Override
+ public Partition apply(@Nullable org.apache.hadoop.hive.metastore.api.Partition input) {
+ if (input == null){
+ return null;
+ }
+ try {
+ return new Partition(qlMdTable,input);
+ } catch (HiveException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+ );
+
+ Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
+ EximUtil.createExportDump(
+ metaDataPath.getFileSystem(conf),
+ metaDataPath,
+ qlMdTable,
+ qlPtns,
+ replicationSpec);
+
+ // FIXME : dump _files should ideally happen at dbnotif time, doing it here introduces
+ // rubberbanding. But, till we have support for that, this is our closest equivalent
+ for (Partition qlPtn : qlPtns){
+ Path ptnDataPath = new Path(evRoot,qlPtn.getName());
+ rootTasks.add(ReplCopyTask.getDumpCopyTask(
+ replicationSpec, qlPtn.getPartitionPath(), ptnDataPath, conf));
+ }
+
+ break;
+ }
+ default:
+ LOG.info("Skipping processing#{} message : {}",ev.getEventId(), ev.getMessage());
+ // TODO : handle other event types
+ break;
+ }
+
+ }
+
+ public static void injectNextDumpDirForTest(String dumpdir){
+ testInjectDumpDir = dumpdir;
+ }
+
String getNextDumpDir() {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
- return "next";
- // make it easy to write unit tests, instead of unique id generation.
+ // make it easy to write .q unit tests, instead of unique id generation.
// however, this does mean that in writing tests, we have to be aware that
// repl dump will clash with prior dumps, and thus have to clean up properly.
+ if (testInjectDumpDir == null){
+ return "next";
+ } else {
+ return testInjectDumpDir;
+ }
} else {
return String.valueOf(System.currentTimeMillis());
// TODO: time good enough for now - we'll likely improve this.
@@ -259,14 +439,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// looking at each db, and then each table, and then setting up the appropriate
// import job in its place.
- // FIXME : handle non-bootstrap cases.
-
- // We look at the path, and go through each subdir.
- // Each subdir corresponds to a database.
- // For each subdir, there is a _metadata file which allows us to re-impress the db object
- // After each db object is loaded appropriately, iterate through the sub-table dirs, and pretend
- // that we had an IMPORT on each of them, into this db.
-
try {
Path loadPath = new Path(path);
@@ -277,25 +449,39 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
throw new FileNotFoundException(loadPath.toUri().toString());
}
- // Now, the dumped path can be one of two things:
+ // Now, the dumped path can be one of three things:
// a) It can be a db dump, in which case we expect a set of dirs, each with a
// db name, and with a _metadata file in each, and table dirs inside that.
// b) It can be a table dump dir, in which case we expect a _metadata dump of
// a table in question in the dir, and individual ptn dir hierarchy.
- // Once we expand this into doing incremental repl, we can have individual events which can
- // be other things like roles and fns as well. Also, if tblname is specified, we're guaranteed
- // that this is a tbl-level dump, and it is an error condition if we find anything else. Also,
- // if dbname is specified, we expect exactly one db dumped, and having more is an error
- // condition.
+ // c) A dump can be an event-level dump, which means we have several subdirs
+ // each of which have the evid as the dir name, and each of which correspond
+ // to a event-level dump. Currently, only CREATE_TABLE and ADD_PARTITION are
+ // handled, so all of these dumps will be at a table/ptn level.
+
+ // For incremental repl, eventually, we can have individual events which can
+ // be other things like roles and fns as well.
+
+ boolean evDump = false;
+ Path dumpMetadata = new Path(loadPath, "_dumpmetadata");
+ // TODO : only event dumps currently have _dumpmetadata - this might change. Generify.
+ if (fs.exists(dumpMetadata)){
+ LOG.debug("{} exists, this is a event dump", dumpMetadata);
+ evDump = true;
+ } else {
+ LOG.debug("{} does not exist, this is an object dump", dumpMetadata);
+ }
- if ((tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+ if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
+ // not an event dump, and table name pattern specified, this has to be a tbl-level dump
analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, path, null);
return;
}
FileStatus[] srcs = LoadSemanticAnalyzer.matchFilesOrDir(fs, loadPath);
if (srcs == null || (srcs.length == 0)) {
- throw new FileNotFoundException(loadPath.toUri().toString());
+ LOG.warn("Nothing to load at {}",loadPath.toUri().toString());
+ return;
}
FileStatus[] dirsInLoadPath = fs.listStatus(loadPath, EximUtil.getDirectoryFilter(fs));
@@ -304,19 +490,31 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
throw new IllegalArgumentException("No data to load in path " + loadPath.toUri().toString());
}
- if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
- LOG.debug("Found multiple dirs when we expected 1:");
- for (FileStatus d : dirsInLoadPath) {
- LOG.debug("> " + d.getPath().toUri().toString());
+ if (!evDump){
+ // not an event dump, not a table dump - thus, a db dump
+ if ((dbNameOrPattern != null) && (dirsInLoadPath.length > 1)) {
+ LOG.debug("Found multiple dirs when we expected 1:");
+ for (FileStatus d : dirsInLoadPath) {
+ LOG.debug("> " + d.getPath().toUri().toString());
+ }
+ throw new IllegalArgumentException(
+ "Multiple dirs in "
+ + loadPath.toUri().toString()
+ + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
}
- throw new IllegalArgumentException(
- "Multiple dirs in "
- + loadPath.toUri().toString()
- + " does not correspond to REPL LOAD expecting to load to a singular destination point.");
- }
- for (FileStatus dir : dirsInLoadPath) {
- analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+ for (FileStatus dir : dirsInLoadPath) {
+ analyzeDatabaseLoad(dbNameOrPattern, fs, dir);
+ }
+ } else {
+ // event dump, each subdir is an individual event dump.
+ for (FileStatus dir : dirsInLoadPath){
+ // event loads will behave similar to table loads, with one crucial difference
+ // precursor order is strict, and each event must be processed after the previous one.
+ LOG.debug("Loading event from {} to {}.{}", dir.getPath().toUri(), dbNameOrPattern, tblNameOrPattern);
+ analyzeTableLoad(dbNameOrPattern, tblNameOrPattern, dir.getPath().toUri().toString(), null);
+ // FIXME: we should have a strict order of execution so that each event's tasks occur linearly
+ }
}
} catch (Exception e) {
@@ -326,6 +524,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
+ private void analyzeEventLoad(String dbNameOrPattern, String tblNameOrPattern,
+ FileSystem fs, FileStatus dir) throws SemanticException {
+
+ }
+
private void analyzeDatabaseLoad(String dbName, FileSystem fs, FileStatus dir)
throws SemanticException {
try {
@@ -486,11 +689,10 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
// If we do call it, then FetchWork thinks that the "table" here winds up thinking that
// this is a partitioned dir, which does not work. Thus, this does not work.
- writeOutput(values);
+ writeOutput(values,ctx.getResFile());
}
- private void writeOutput(List<String> values) throws SemanticException {
- Path outputFile = ctx.getResFile();
+ private void writeOutput(List<String> values, Path outputFile) throws SemanticException {
FileSystem fs = null;
DataOutputStream outStream = null;
try {
@@ -499,7 +701,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
outStream.writeBytes((values.get(0) == null ? Utilities.nullStringOutput : values.get(0)));
for (int i = 1; i < values.size(); i++) {
outStream.write(Utilities.ctrlaCode);
- outStream.writeBytes((values.get(1) == null ? Utilities.nullStringOutput : values.get(1)));
+ outStream.writeBytes((values.get(i) == null ? Utilities.nullStringOutput : values.get(i)));
}
outStream.write(Utilities.newLineCode);
} catch (IOException e) {
@@ -513,17 +715,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private ReplicationSpec getNewReplicationSpec() throws SemanticException {
try {
- ReplicationSpec replicationSpec =
- new ReplicationSpec(true, false, "replv2", "will-be-set", false, true);
- replicationSpec.setCurrentReplicationState(String.valueOf(db.getMSC()
+ ReplicationSpec rspec = getNewReplicationSpec("replv2","will-be-set");
+ rspec.setCurrentReplicationState(String.valueOf(db.getMSC()
.getCurrentNotificationEventId().getEventId()));
- return replicationSpec;
+ return rspec;
} catch (Exception e) {
- throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
- // codes
+ throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error codes
}
}
+ private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException {
+ return new ReplicationSpec(true, false, evState, objState, false, true);
+ }
+
private Iterable<? extends String> matchesTbl(String dbName, String tblPattern)
throws HiveException {
if (tblPattern == null) {
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 824cf11..060f2a1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -179,6 +179,14 @@ public class ReplicationSpec {
}
/**
+ * Determines if a current replication object(current state of dump) is allowed to
+ * replicate-replace-into a given partition
+ */
+ public boolean allowReplacementInto(org.apache.hadoop.hive.metastore.api.Partition ptn){
+ return allowReplacement(getLastReplicatedStateFromParameters(ptn.getParameters()),this.getCurrentReplicationState());
+ }
+
+ /**
* Determines if a current replication event specification is allowed to
* replicate-replace-into a given partition
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
index 7a583c3..6ffd94a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
@@ -25,12 +25,14 @@ import java.util.Map;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
/**
* Contains the information needed to add one or more partitions.
*/
public class AddPartitionDesc extends DDLDesc implements Serializable {
+
public static class OnePartitionDesc {
public OnePartitionDesc() {}
@@ -152,6 +154,7 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
boolean ifNotExists;
List<OnePartitionDesc> partitions = null;
boolean replaceMode = false;
+ private ReplicationSpec replicationSpec = null;
/**
@@ -302,4 +305,23 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
public boolean getReplaceMode() {
return this.replaceMode;
}
+
+ /**
+ * @param replicationSpec Sets the replication spec governing this create.
+ * This parameter will have meaningful values only for creates happening as a result of a replication.
+ */
+ public void setReplicationSpec(ReplicationSpec replicationSpec) {
+ this.replicationSpec = replicationSpec;
+ }
+
+ /**
+ * @return what kind of replication scope this drop is running under.
+ * This can result in a "CREATE/REPLACE IF NEWER THAN" kind of semantic
+ */
+ public ReplicationSpec getReplicationSpec(){
+ if (replicationSpec == null){
+ this.replicationSpec = new ReplicationSpec();
+ }
+ return this.replicationSpec;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/2f5889c9/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
index 60858e6..4f614a8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
import org.apache.hadoop.hive.serde.serdeConstants;
@@ -91,6 +92,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
boolean isTemporary = false;
private boolean isMaterialization = false;
private boolean replaceMode = false;
+ private ReplicationSpec replicationSpec = null;
private boolean isCTAS = false;
List<SQLPrimaryKey> primaryKeys;
List<SQLForeignKey> foreignKeys;
@@ -646,6 +648,25 @@ public class CreateTableDesc extends DDLDesc implements Serializable {
return replaceMode;
}
+ /**
+ * @param replicationSpec Sets the replication spec governing this create.
+ * This parameter will have meaningful values only for creates happening as a result of a replication.
+ */
+ public void setReplicationSpec(ReplicationSpec replicationSpec) {
+ this.replicationSpec = replicationSpec;
+ }
+
+ /**
+ * @return what kind of replication scope this drop is running under.
+ * This can result in a "CREATE/REPLACE IF NEWER THAN" kind of semantic
+ */
+ public ReplicationSpec getReplicationSpec(){
+ if (replicationSpec == null){
+ this.replicationSpec = new ReplicationSpec();
+ }
+ return this.replicationSpec;
+ }
+
public boolean isCTAS() {
return isCTAS;
}