You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/03 10:02:55 UTC
[16/16] hive git commit: HIVE-19267: Replicate ACID/MM tables write
operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
HIVE-19267: Replicate ACID/MM tables write operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f519db7e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f519db7e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f519db7e
Branch: refs/heads/master
Commit: f519db7eafacb4b4d2d9fe2a9e10e908d8077224
Parents: 285a9b4
Author: Sankar Hariappan <sa...@apache.org>
Authored: Tue Jul 3 15:32:05 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Tue Jul 3 15:32:05 2018 +0530
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 209 +-
.../listener/DummyRawStoreFailEvent.java | 15 +
.../listener/TestDbNotificationListener.java | 5 +
.../hive/ql/parse/TestReplicationScenarios.java | 72 -
.../TestReplicationScenariosAcidTables.java | 602 ++-
...TestReplicationScenariosAcrossInstances.java | 15 +-
.../hadoop/hive/ql/parse/WarehouseInstance.java | 5 +
.../metastore/SynchronizedMetaStoreClient.java | 5 +
.../apache/hadoop/hive/ql/exec/MoveTask.java | 6 +-
.../hadoop/hive/ql/exec/ReplCopyTask.java | 5 +-
.../apache/hadoop/hive/ql/exec/ReplTxnTask.java | 31 +-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 4 +-
.../IncrementalLoadTasksBuilder.java | 73 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 37 +-
.../hadoop/hive/ql/io/HiveInputFormat.java | 24 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 18 +-
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 7 +-
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 10 +-
.../apache/hadoop/hive/ql/metadata/Hive.java | 167 +-
.../hadoop/hive/ql/metadata/HiveUtils.java | 11 +-
.../hive/ql/parse/ImportSemanticAnalyzer.java | 83 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 8 +-
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 16 +-
.../hadoop/hive/ql/parse/repl/CopyUtils.java | 2 +-
.../hadoop/hive/ql/parse/repl/dump/Utils.java | 4 -
.../repl/dump/events/CommitTxnHandler.java | 125 +-
.../ql/parse/repl/dump/events/EventHandler.java | 23 +-
.../parse/repl/dump/events/InsertHandler.java | 4 +
.../parse/repl/load/UpdatedMetaDataTracker.java | 124 +-
.../repl/load/message/AbortTxnHandler.java | 7 +-
.../repl/load/message/AllocWriteIdHandler.java | 2 +-
.../repl/load/message/CommitTxnHandler.java | 78 +-
.../parse/repl/load/message/MessageHandler.java | 8 +-
.../parse/repl/load/message/OpenTxnHandler.java | 7 +-
.../apache/hadoop/hive/ql/plan/MoveWork.java | 12 +-
.../apache/hadoop/hive/ql/plan/ReplTxnWork.java | 15 +
.../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2675 +++++++-----
.../gen/thrift/gen-cpp/ThriftHiveMetastore.h | 126 +
.../ThriftHiveMetastore_server.skeleton.cpp | 5 +
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3905 ++++++++++--------
.../gen/thrift/gen-cpp/hive_metastore_types.h | 218 +-
.../metastore/api/AddDynamicPartitions.java | 32 +-
.../api/AllocateTableWriteIdsRequest.java | 68 +-
.../api/AllocateTableWriteIdsResponse.java | 36 +-
.../metastore/api/ClearFileMetadataRequest.java | 32 +-
.../hive/metastore/api/ClientCapabilities.java | 32 +-
.../hive/metastore/api/CommitTxnRequest.java | 168 +-
.../hive/metastore/api/CompactionRequest.java | 44 +-
.../hive/metastore/api/CreationMetadata.java | 32 +-
.../metastore/api/FindSchemasByColsResp.java | 36 +-
.../hive/metastore/api/FireEventRequest.java | 32 +-
.../metastore/api/GetAllFunctionsResponse.java | 36 +-
.../api/GetFileMetadataByExprRequest.java | 32 +-
.../api/GetFileMetadataByExprResult.java | 48 +-
.../metastore/api/GetFileMetadataRequest.java | 32 +-
.../metastore/api/GetFileMetadataResult.java | 44 +-
.../hive/metastore/api/GetTablesRequest.java | 32 +-
.../hive/metastore/api/GetTablesResult.java | 36 +-
.../metastore/api/GetValidWriteIdsRequest.java | 32 +-
.../metastore/api/GetValidWriteIdsResponse.java | 36 +-
.../api/HeartbeatTxnRangeResponse.java | 64 +-
.../metastore/api/InsertEventRequestData.java | 227 +-
.../hadoop/hive/metastore/api/LockRequest.java | 36 +-
.../hive/metastore/api/Materialization.java | 32 +-
.../api/NotificationEventResponse.java | 36 +-
.../metastore/api/PutFileMetadataRequest.java | 64 +-
.../api/ReplTblWriteIdStateRequest.java | 32 +-
.../hive/metastore/api/SchemaVersion.java | 36 +-
.../hive/metastore/api/ShowCompactResponse.java | 36 +-
.../hive/metastore/api/ShowLocksResponse.java | 36 +-
.../hive/metastore/api/TableValidWriteIds.java | 32 +-
.../hive/metastore/api/ThriftHiveMetastore.java | 3468 ++++++++++------
.../hive/metastore/api/WMFullResourcePlan.java | 144 +-
.../api/WMGetAllResourcePlanResponse.java | 36 +-
.../WMGetTriggersForResourePlanResponse.java | 36 +-
.../api/WMValidateResourcePlanResponse.java | 64 +-
.../hive/metastore/api/WriteEventInfo.java | 1012 +++++
.../api/WriteNotificationLogRequest.java | 949 +++++
.../api/WriteNotificationLogResponse.java | 283 ++
.../gen-php/metastore/ThriftHiveMetastore.php | 1630 ++++----
.../src/gen/thrift/gen-php/metastore/Types.php | 1630 +++++---
.../hive_metastore/ThriftHiveMetastore-remote | 7 +
.../hive_metastore/ThriftHiveMetastore.py | 1139 ++---
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 933 +++--
.../gen/thrift/gen-rb/hive_metastore_types.rb | 86 +-
.../gen/thrift/gen-rb/thrift_hive_metastore.rb | 54 +
.../hadoop/hive/metastore/HiveMetaStore.java | 86 +
.../hive/metastore/HiveMetaStoreClient.java | 10 +-
.../hadoop/hive/metastore/IMetaStoreClient.java | 16 +-
.../hive/metastore/MetaStoreEventListener.java | 12 +
.../metastore/MetaStoreListenerNotifier.java | 6 +
.../hadoop/hive/metastore/ObjectStore.java | 60 +
.../apache/hadoop/hive/metastore/RawStore.java | 14 +
.../hive/metastore/ReplChangeManager.java | 10 +-
.../hive/metastore/cache/CachedStore.java | 12 +
.../hive/metastore/events/AcidWriteEvent.java | 91 +
.../metastore/messaging/AcidWriteMessage.java | 50 +
.../metastore/messaging/CommitTxnMessage.java | 23 +
.../hive/metastore/messaging/EventMessage.java | 3 +-
.../messaging/MessageDeserializer.java | 9 +
.../metastore/messaging/MessageFactory.java | 12 +
.../messaging/json/JSONAcidWriteMessage.java | 150 +
.../messaging/json/JSONCommitTxnMessage.java | 95 +
.../messaging/json/JSONMessageDeserializer.java | 9 +
.../messaging/json/JSONMessageFactory.java | 8 +
.../model/MTxnWriteNotificationLog.java | 123 +
.../hive/metastore/tools/SQLGenerator.java | 9 +
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 28 +
.../hadoop/hive/metastore/txn/TxnHandler.java | 187 +-
.../hadoop/hive/metastore/txn/TxnStore.java | 11 +
.../hadoop/hive/metastore/utils/FileUtils.java | 12 +-
.../src/main/resources/package.jdo | 35 +
.../main/sql/derby/hive-schema-3.1.0.derby.sql | 15 +
.../main/sql/derby/hive-schema-4.0.0.derby.sql | 15 +
.../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql | 1 -
.../sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql | 16 +
.../main/sql/mssql/hive-schema-3.1.0.mssql.sql | 17 +
.../main/sql/mssql/hive-schema-4.0.0.mssql.sql | 17 +
.../sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql | 16 +
.../main/sql/mysql/hive-schema-3.0.0.mysql.sql | 1 -
.../main/sql/mysql/hive-schema-3.1.0.mysql.sql | 16 +
.../main/sql/mysql/hive-schema-4.0.0.mysql.sql | 16 +
.../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql | 4 +-
.../sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql | 16 +
.../sql/oracle/hive-schema-3.0.0.oracle.sql | 1 -
.../sql/oracle/hive-schema-3.1.0.oracle.sql | 15 +
.../sql/oracle/hive-schema-4.0.0.oracle.sql | 15 +
.../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql | 4 +-
.../oracle/upgrade-3.0.0-to-3.1.0.oracle.sql | 16 +
.../sql/postgres/hive-schema-3.0.0.postgres.sql | 2 -
.../sql/postgres/hive-schema-3.1.0.postgres.sql | 15 +
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 15 +
.../upgrade-3.0.0-to-3.1.0.postgres.sql | 16 +
.../src/main/thrift/hive_metastore.thrift | 30 +-
.../DummyRawStoreControlledCommit.java | 11 +
.../DummyRawStoreForJdoConnection.java | 10 +
.../HiveMetaStoreClientPreCatalog.java | 10 +-
137 files changed, 15896 insertions(+), 7205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 6321f9b..717cc8a 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -75,11 +76,14 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -269,10 +273,16 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public PartitionFiles next() {
try {
Partition p = partitionIter.next();
- List<String> files = Lists.newArrayList(new FileIterator(p.getSd().getLocation()));
+ Iterator<String> fileIterator;
+ //For transactional tables, the actual file copy will be done by acid write event during replay of commit txn.
+ if (!TxnUtils.isTransactionalTable(t)) {
+ List<String> files = Lists.newArrayList(new FileIterator(p.getSd().getLocation()));
+ fileIterator = files.iterator();
+ } else {
+ fileIterator = Collections.emptyIterator();
+ }
PartitionFiles partitionFiles =
- new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()),
- files.iterator());
+ new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()), fileIterator);
return partitionFiles;
} catch (MetaException e) {
throw new RuntimeException(e);
@@ -414,10 +424,15 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
class FileChksumIterator implements Iterator<String> {
private List<String> files;
private List<String> chksums;
+ private List<String> subDirs;
int i = 0;
FileChksumIterator(List<String> files, List<String> chksums) {
+ this(files, chksums, null);
+ }
+ FileChksumIterator(List<String> files, List<String> chksums, List<String> subDirs) {
this.files = files;
this.chksums = chksums;
+ this.subDirs = subDirs;
}
@Override
public boolean hasNext() {
@@ -428,7 +443,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
public String next() {
String result;
try {
- result = ReplChangeManager.encodeFileUri(files.get(i), chksums != null ? chksums.get(i) : null, null);
+ result = ReplChangeManager.encodeFileUri(files.get(i), chksums != null ? chksums.get(i) : null,
+ subDirs != null ? subDirs.get(i) : null);
} catch (IOException e) {
// File operations failed
LOG.error("Encoding file URI failed with error " + e.getMessage());
@@ -623,6 +639,23 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
}
}
+ @Override
+ public void onAcidWrite(AcidWriteEvent acidWriteEvent, Connection dbConn, SQLGenerator sqlGenerator)
+ throws MetaException {
+ AcidWriteMessage msg = msgFactory.buildAcidWriteMessage(acidWriteEvent,
+ new FileChksumIterator(acidWriteEvent.getFiles(), acidWriteEvent.getChecksums(),
+ acidWriteEvent.getSubDirs()));
+ NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(), msg.toString());
+ event.setMessageFormat(msgFactory.getMessageFormat());
+ event.setDbName(acidWriteEvent.getDatabase());
+ event.setTableName(acidWriteEvent.getTable());
+ try {
+ addWriteNotificationLog(event, acidWriteEvent, dbConn, sqlGenerator, msg);
+ } catch (SQLException e) {
+ throw new MetaException("Unable to add write notification log " + StringUtils.stringifyException(e));
+ }
+ }
+
private int now() {
long millis = System.currentTimeMillis();
millis /= 1000;
@@ -634,12 +667,133 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
return (int)millis;
}
+ /**
+ * Close statement instance.
+ * @param stmt statement instance.
+ */
+ private static void closeStmt(Statement stmt) {
+ try {
+ if (stmt != null && !stmt.isClosed()) {
+ stmt.close();
+ }
+ } catch (SQLException e) {
+ LOG.warn("Failed to close statement " + e.getMessage());
+ }
+ }
+
+ /**
+ * Close the ResultSet.
+ * @param rs may be {@code null}
+ */
+ private static void close(ResultSet rs) {
+ try {
+ if (rs != null && !rs.isClosed()) {
+ rs.close();
+ }
+ } catch(SQLException ex) {
+ LOG.warn("Failed to close result set " + ex.getMessage());
+ }
+ }
+
+ private long getNextNLId(Statement stmt, SQLGenerator sqlGenerator, String sequence)
+ throws SQLException, MetaException {
+ String s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " +
+ "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " + quoteString(sequence));
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = null;
+ try {
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction database not properly configured, can't find next NL id.");
+ }
+
+ long nextNLId = rs.getLong(1);
+ long updatedNLId = nextNLId + 1;
+ s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " +
+ quoteString(sequence);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ return nextNLId;
+ }finally {
+ close(rs);
+ }
+ }
+
+ private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent acidWriteEvent, Connection dbConn,
+ SQLGenerator sqlGenerator, AcidWriteMessage msg) throws MetaException, SQLException {
+ LOG.debug("DbNotificationListener: adding write notification log for : {}", event.getMessage());
+ assert ((dbConn != null) && (sqlGenerator != null));
+
+ Statement stmt =null;
+ ResultSet rs = null;
+ String dbName = acidWriteEvent.getDatabase();
+ String tblName = acidWriteEvent.getTable();
+ String partition = acidWriteEvent.getPartition();
+ String tableObj = msg.getTableObjStr();
+ String partitionObj = msg.getPartitionObjStr();
+ String files = ReplChangeManager.joinWithSeparator(msg.getFiles());
+
+ try {
+ stmt = dbConn.createStatement();
+ if (sqlGenerator.getDbProduct() == MYSQL) {
+ stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
+ }
+
+ String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" +
+ " \"TXN_WRITE_NOTIFICATION_LOG\" " +
+ "where \"WNL_DATABASE\" = " + quoteString(dbName) +
+ "and \"WNL_TABLE\" = " + quoteString(tblName) + " and \"WNL_PARTITION\" = " +
+ quoteString(partition) + " and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId()));
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ // if rs is empty then no lock is taken and thus it can not cause deadlock.
+ long nextNLId = getNextNLId(stmt, sqlGenerator,
+ "org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog");
+ s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" (\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\"," +
+ " \"WNL_DATABASE\", \"WNL_TABLE\"," +
+ " \"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", \"WNL_FILES\", \"WNL_EVENT_TIME\")" +
+ " values (" + nextNLId
+ + "," + acidWriteEvent.getTxnId() + "," + acidWriteEvent.getWriteId()+ "," +
+ quoteString(dbName)+ "," + quoteString(tblName)+ "," + quoteString(partition)+ "," +
+ quoteString(tableObj)+ "," + quoteString(partitionObj) + "," + quoteString(files)+
+ "," + now() + ")";
+ LOG.info("Going to execute insert <" + s + ">");
+ stmt.execute(sqlGenerator.addEscapeCharacters(s));
+ } else {
+ String existingFiles = rs.getString(1);
+ if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files))) {
+ // If list of files are already present then no need to update it again. This scenario can come in case of
+ // retry done to the meta store for the same operation.
+ LOG.info("file list " + files + " already present");
+ return;
+ }
+ long nlId = rs.getLong(2);
+ files = ReplChangeManager.joinWithSeparator(Lists.newArrayList(files, existingFiles));
+ s = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = " + quoteString(tableObj) + "," +
+ " \"WNL_PARTITION_OBJ\" = " + quoteString(partitionObj) + "," +
+ " \"WNL_FILES\" = " + quoteString(files) + "," +
+ " \"WNL_EVENT_TIME\" = " + now() +
+ " where \"WNL_ID\" = " + nlId;
+ LOG.info("Going to execute update <" + s + ">");
+ stmt.executeUpdate(sqlGenerator.addEscapeCharacters(s));
+ }
+ } catch (SQLException e) {
+ LOG.warn("failed to add write notification log" + e.getMessage());
+ throw e;
+ } finally {
+ closeStmt(stmt);
+ close(rs);
+ }
+ }
+
static String quoteString(String input) {
return "'" + input + "'";
}
private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent, Connection dbConn,
SQLGenerator sqlGenerator) throws MetaException, SQLException {
+ LOG.debug("DbNotificationListener: adding notification log for : {}", event.getMessage());
if ((dbConn == null) || (sqlGenerator == null)) {
LOG.info("connection or sql generator is not set so executing sql via DN");
process(event, listenerEvent);
@@ -669,22 +823,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
- s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " +
- "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " +
- " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'");
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("failed to get next NEXT_VAL from SEQUENCE_TABLE");
- }
-
- long nextNLId = rs.getLong(1);
- long updatedNLId = nextNLId + 1;
- s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " +
-
- " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'";
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
+ long nextNLId = getNextNLId(stmt, sqlGenerator,
+ "org.apache.hadoop.hive.metastore.model.MNotificationLog");
List<String> insert = new ArrayList<>();
@@ -712,20 +852,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
LOG.warn("failed to add notification log" + e.getMessage());
throw e;
} finally {
- if (stmt != null && !stmt.isClosed()) {
- try {
- stmt.close();
- } catch (SQLException e) {
- LOG.warn("Failed to close statement " + e.getMessage());
- }
- }
- if (rs != null && !rs.isClosed()) {
- try {
- rs.close();
- } catch (SQLException e) {
- LOG.warn("Failed to close result set " + e.getMessage());
- }
- }
+ closeStmt(stmt);
+ close(rs);
}
}
@@ -742,12 +870,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
event.getMessage());
HMSHandler.getMSForConf(conf).addNotificationEvent(event);
- // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
- if (event.isSetEventId()) {
- listenerEvent.putParameter(
- MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
- Long.toString(event.getEventId()));
- }
+ // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
+ if (event.isSetEventId()) {
+ listenerEvent.putParameter(
+ MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+ Long.toString(event.getEventId()));
+ }
}
private static class CleanerThread extends Thread {
@@ -768,6 +896,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
while (true) {
try {
rs.cleanNotificationEvents(ttl);
+ rs.cleanWriteNotificationEvents(ttl);
} catch (Exception ex) {
//catching exceptions here makes sure that the thread doesn't die in case of unexpected
//exceptions
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index abf67a8..b4b118e 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException;
import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMNullablePool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
import org.apache.thrift.TException;
@@ -880,6 +881,20 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
}
@Override
+ public void cleanWriteNotificationEvents(int olderThan) {
+ if (!shouldEventSucceed) {
+ //throw exception to simulate an issue with cleaner thread
+ throw new RuntimeException("Dummy exception while cleaning write notifications");
+ }
+ objectStore.cleanWriteNotificationEvents(olderThan);
+ }
+
+ @Override
+ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+ return objectStore.getAllWriteEventInfo(txnId, dbName, tableName);
+ }
+
+ @Override
public CurrentNotificationEventId getCurrentNotificationEventId() {
return objectStore.getCurrentNotificationEventId();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index eef917e..82429e3 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
@@ -238,6 +239,10 @@ public class TestDbNotificationListener {
public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws MetaException {
pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent);
}
+
+ public void onAcidWrite(AcidWriteEvent acidWriteEvent) throws MetaException {
+ pushEventId(EventType.ACID_WRITE, acidWriteEvent);
+ }
}
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 46c623d..c82a933 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -2833,78 +2833,6 @@ public class TestReplicationScenarios {
verifyRun("SELECT max(a) from " + replDbName + ".ptned2 where b=1", new String[]{"8"}, driverMirror);
}
- // TODO: This test should be removed once ACID tables replication is supported.
- @Test
- public void testSkipTables() throws Exception {
- String testName = "skipTables";
- String dbName = createDB(testName, driver);
- String replDbName = dbName + "_dupe";
-
- // TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables.
- // If I change it to use proper txn manager, the setup for some tests hangs.
- // This used to work by accident, now this works due a test flag. The test needs to be fixed.
- // Create table
- run("CREATE TABLE " + dbName + ".acid_table (key int, value int) PARTITIONED BY (load_date date) " +
- "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
- run("CREATE TABLE " + dbName + ".mm_table (key int, value int) PARTITIONED BY (load_date date) " +
- "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'," +
- " 'transactional_properties'='insert_only')", driver);
- verifyIfTableExist(dbName, "acid_table", metaStoreClient);
- verifyIfTableExist(dbName, "mm_table", metaStoreClient);
-
- // Bootstrap test
- Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
- String replDumpId = bootstrapDump.lastReplId;
- verifyIfTableNotExist(replDbName, "acid_table", metaStoreClientMirror);
- verifyIfTableNotExist(replDbName, "mm_table", metaStoreClientMirror);
-
- // Test alter table
- run("ALTER TABLE " + dbName + ".acid_table RENAME TO " + dbName + ".acid_table_rename", driver);
- verifyIfTableExist(dbName, "acid_table_rename", metaStoreClient);
-
- // Dummy create table command to mark proper last repl ID after dump
- run("CREATE TABLE " + dbName + ".dummy (a int)", driver);
-
- // Perform REPL-DUMP/LOAD
- Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName);
- replDumpId = incrementalDump.lastReplId;
- verifyIfTableNotExist(replDbName, "acid_table_rename", metaStoreClientMirror);
-
- // Create another table for incremental repl verification
- run("CREATE TABLE " + dbName + ".acid_table_incremental (key int, value int) PARTITIONED BY (load_date date) " +
- "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
- run("CREATE TABLE " + dbName + ".mm_table_incremental (key int, value int) PARTITIONED BY (load_date date) " +
- "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'," +
- " 'transactional_properties'='insert_only')", driver);
- verifyIfTableExist(dbName, "acid_table_incremental", metaStoreClient);
- verifyIfTableExist(dbName, "mm_table_incremental", metaStoreClient);
-
- // Dummy insert into command to mark proper last repl ID after dump
- run("INSERT INTO " + dbName + ".dummy values(1)", driver);
-
- // Perform REPL-DUMP/LOAD
- incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName);
- replDumpId = incrementalDump.lastReplId;
- verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror);
- verifyIfTableNotExist(replDbName, "mm_table_incremental", metaStoreClientMirror);
-
- // Test adding a constraint
- run("ALTER TABLE " + dbName + ".acid_table_incremental ADD CONSTRAINT key_pk PRIMARY KEY (key) DISABLE NOVALIDATE", driver);
- try {
- List<SQLPrimaryKey> pks = metaStoreClient.getPrimaryKeys(new PrimaryKeysRequest(dbName, "acid_table_incremental"));
- assertEquals(pks.size(), 1);
- } catch (TException te) {
- assertNull(te);
- }
-
- // Dummy insert into command to mark proper last repl ID after dump
- run("INSERT INTO " + dbName + ".dummy values(2)", driver);
-
- // Perform REPL-DUMP/LOAD
- incrementalLoadAndVerify(dbName, replDumpId, replDbName);
- verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror);
- }
-
@Test
public void testDeleteStagingDir() throws IOException {
String testName = "deleteStagingDir";
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 86c0405..8c683cf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.junit.rules.TestName;
+
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +54,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Nullable;
+import java.util.Collections;
+import com.google.common.collect.Lists;
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables
@@ -66,8 +69,13 @@ public class TestReplicationScenariosAcidTables {
protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
private static WarehouseInstance primary, replica, replicaNonAcid;
- private String primaryDbName, replicatedDbName;
private static HiveConf conf;
+ private String primaryDbName, replicatedDbName, primaryDbNameExtra;
+ private enum OperationType {
+ REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS,
+ REPL_TEST_ACID_INSERT_OVERWRITE, REPL_TEST_ACID_INSERT_IMPORT, REPL_TEST_ACID_INSERT_LOADLOCAL,
+ REPL_TEST_ACID_INSERT_UNION
+ }
@BeforeClass
public static void classLevelSetup() throws Exception {
@@ -80,9 +88,13 @@ public class TestReplicationScenariosAcidTables {
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "true");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
- put("hive.repl.dump.include.acid.tables", "true");
put("hive.metastore.client.capability.check", "false");
put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+ put("hive.exec.dynamic.partition.mode", "nonstrict");
+ put("hive.strict.checks.bucketing", "false");
+ put("hive.mapred.mode", "nonstrict");
+ put("mapred.input.dir.recursive", "true");
+ put("hive.metastore.disallow.incompatible.col.type.changes", "false");
}};
primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
@@ -90,7 +102,6 @@ public class TestReplicationScenariosAcidTables {
put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
put("hive.support.concurrency", "false");
put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
- put("hive.repl.dump.include.acid.tables", "true");
put("hive.metastore.client.capability.check", "false");
}};
replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
@@ -109,6 +120,9 @@ public class TestReplicationScenariosAcidTables {
replicatedDbName = "replicated_" + primaryDbName;
primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
SOURCE_OF_REPLICATION + "' = '1,2,3')");
+ primaryDbNameExtra = primaryDbName+"_extra";
+ primary.run("create database " + primaryDbNameExtra + " WITH DBPROPERTIES ( '" +
+ SOURCE_OF_REPLICATION + "' = '1,2,3')");
}
@After
@@ -116,6 +130,7 @@ public class TestReplicationScenariosAcidTables {
primary.run("drop database if exists " + primaryDbName + " cascade");
replica.run("drop database if exists " + replicatedDbName + " cascade");
replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade");
+ primary.run("drop database if exists " + primaryDbName + "_extra cascade");
}
@Test
@@ -482,4 +497,585 @@ public class TestReplicationScenariosAcidTables {
primary.run("DROP TABLE " + dbName + ".normal");
primary.run("drop database " + dbName);
}
+
+ @Test
+ public void testAcidTableIncrementalReplication() throws Throwable {
+ WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+ replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName)
+ .verifyResult(bootStrapDump.lastReplicationId);
+ List<String> selectStmtList = new ArrayList<>();
+ List<String[]> expectedValues = new ArrayList<>();
+
+ appendInsert(selectStmtList, expectedValues);
+ appendDelete(selectStmtList, expectedValues);
+ appendUpdate(selectStmtList, expectedValues);
+ appendTruncate(selectStmtList, expectedValues);
+ appendInsertIntoFromSelect(selectStmtList, expectedValues);
+ appendMerge(selectStmtList, expectedValues);
+ appendCreateAsSelect(selectStmtList, expectedValues);
+ appendImport(selectStmtList, expectedValues);
+ appendInsertOverwrite(selectStmtList, expectedValues);
+ //appendLoadLocal(selectStmtList, expectedValues);
+ appendInsertUnion(selectStmtList, expectedValues);
+ appendMultiStatementTxn(selectStmtList, expectedValues);
+ appendMultiStatementTxnUpdateDelete(selectStmtList, expectedValues);
+
+ verifyIncrementalLoad(selectStmtList, expectedValues, bootStrapDump.lastReplicationId);
+ }
+
+ private void appendInsert(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testInsert";
+ String tableNameMM = tableName + "_MM";
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ }
+
+ private void appendDelete(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testDelete";
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ deleteRecords(tableName);
+ selectStmtList.add("select count(*) from " + tableName);
+ expectedValues.add(new String[] {"0"});
+ }
+
+ private void appendUpdate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testUpdate";
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ updateRecords(tableName);
+ selectStmtList.add("select value from " + tableName + " order by value");
+ expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
+ }
+
+ private void appendTruncate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testTruncate";
+ String tableNameMM = tableName + "_MM";
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ truncateTable(primaryDbName, tableName);
+ selectStmtList.add("select count(*) from " + tableName);
+ expectedValues.add(new String[] {"0"});
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ truncateTable(primaryDbName, tableNameMM);
+ selectStmtList.add("select count(*) from " + tableNameMM);
+ expectedValues.add(new String[] {"0"});
+ }
+
+ private void appendInsertIntoFromSelect(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testInsertIntoFromSelect";
+ String tableNameMM =tableName + "_MM";
+ String tableNameSelect = testName.getMethodName() + "_Select";
+ String tableNameSelectMM = testName.getMethodName() + "_SelectMM";
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableName, tableNameSelect, false, OperationType.REPL_TEST_ACID_INSERT_SELECT);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameSelect + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableNameMM, tableNameSelectMM, true, OperationType.REPL_TEST_ACID_INSERT_SELECT);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameSelectMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ }
+
+ private void appendMerge(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testMerge";
+ String tableNameMerge = testName.getMethodName() + "_Merge";
+
+ insertForMerge(tableName, tableNameMerge, false);
+ selectStmtList.add("select last_update_user from " + tableName + " order by last_update_user");
+ expectedValues.add(new String[] {"creation", "creation", "creation", "creation", "creation",
+ "creation", "creation", "merge_update", "merge_insert", "merge_insert"});
+ selectStmtList.add("select ID from " + tableNameMerge + " order by ID");
+ expectedValues.add(new String[] {"1", "4", "7", "8", "8", "11"});
+ }
+
+ private void appendCreateAsSelect(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testCreateAsSelect";
+ String tableNameMM = tableName + "_MM";
+ String tableNameCTAS = testName.getMethodName() + "_CTAS";
+ String tableNameCTASMM = testName.getMethodName() + "_CTASMM";
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableName, tableNameCTAS, false, OperationType.REPL_TEST_ACID_CTAS);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameCTAS + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableNameMM, tableNameCTASMM, true, OperationType.REPL_TEST_ACID_CTAS);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameCTASMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ }
+
+ private void appendImport(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testImport";
+ String tableNameMM = tableName + "_MM";
+ String tableNameImport = testName.getMethodName() + "_Import";
+ String tableNameImportMM = testName.getMethodName() + "_ImportMM";
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableName, tableNameImport, false, OperationType.REPL_TEST_ACID_INSERT_IMPORT);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameImport + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableNameMM, tableNameImportMM, true, OperationType.REPL_TEST_ACID_INSERT_IMPORT);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameImportMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ }
+
+ private void appendInsertOverwrite(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testInsertOverwrite";
+ String tableNameOW = testName.getMethodName() +"_OW";
+ String tableNameMM = tableName + "_MM";
+ String tableNameOWMM = testName.getMethodName() +"_OWMM";
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableName, tableNameOW, false, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameOW + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableNameMM, tableNameOWMM, true, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameOWMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ }
+
+ //TODO: need to check why its failing. Loading to acid table from local path is failing.
+ private void appendLoadLocal(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testLoadLocal";
+ String tableNameLL = testName.getMethodName() +"_LL";
+ String tableNameMM = tableName + "_MM";
+ String tableNameLLMM = testName.getMethodName() +"_LLMM";
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableName, tableNameLL, false, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameLL + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableNameMM, tableNameLLMM, true, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ selectStmtList.add("select key from " + tableNameLLMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ }
+
+ private void appendInsertUnion(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testInsertUnion";
+ String tableNameUnion = testName.getMethodName() +"_UNION";
+ String tableNameMM = tableName + "_MM";
+ String tableNameUnionMM = testName.getMethodName() +"_UNIONMM";
+ String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+ String[] resultArrayUnion = new String[]{"1", "1", "2", "2", "3", "3", "4", "4", "5", "5"};
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableName, tableNameUnion, false, OperationType.REPL_TEST_ACID_INSERT_UNION);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(resultArray);
+ selectStmtList.add( "select key from " + tableNameUnion + " order by key");
+ expectedValues.add(resultArrayUnion);
+ selectStmtList.add("select key from " + tableName + "_nopart" + " order by key");
+ expectedValues.add(resultArray);
+ selectStmtList.add("select key from " + tableNameUnion + "_nopart" + " order by key");
+ expectedValues.add(resultArrayUnion);
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ insertRecords(tableNameMM, tableNameUnionMM, true, OperationType.REPL_TEST_ACID_INSERT_UNION);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(resultArray);
+ selectStmtList.add( "select key from " + tableNameUnionMM + " order by key");
+ expectedValues.add(resultArrayUnion);
+ selectStmtList.add("select key from " + tableNameMM + "_nopart" + " order by key");
+ expectedValues.add(resultArray);
+ selectStmtList.add("select key from " + tableNameUnionMM + "_nopart" + " order by key");
+ expectedValues.add(resultArrayUnion);
+ }
+
+ private void appendMultiStatementTxn(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ String tableName = testName.getMethodName() + "testMultiStatementTxn";
+ String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+ String tableNameMM = tableName + "_MM";
+ String tableProperty = "'transactional'='true'";
+
+ insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true);
+ selectStmtList.add("select key from " + tableName + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+ tableProperty = setMMtableProperty(tableProperty);
+ insertIntoDB(primaryDbName, tableNameMM, tableProperty, resultArray, true);
+ selectStmtList.add("select key from " + tableNameMM + " order by key");
+ expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+ }
+
+ private void appendMultiStatementTxnUpdateDelete(List<String> selectStmtList, List<String[]> expectedValues)
+ throws Throwable {
+ String tableName = testName.getMethodName() + "testMultiStatementTxnUpdate";
+ String tableNameDelete = testName.getMethodName() + "testMultiStatementTxnDelete";
+ String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+ String tableProperty = "'transactional'='true'";
+
+ insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true);
+ updateRecords(tableName);
+ selectStmtList.add("select value from " + tableName + " order by value");
+ expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
+
+ insertIntoDB(primaryDbName, tableNameDelete, tableProperty, resultArray, true);
+ deleteRecords(tableNameDelete);
+ selectStmtList.add("select count(*) from " + tableNameDelete);
+ expectedValues.add(new String[] {"0"});
+ }
+
+ @Test
+ public void testReplCM() throws Throwable {
+ String tableName = testName.getMethodName();
+ String tableNameMM = testName.getMethodName() + "_MM";
+ String[] result = new String[]{"5"};
+
+ WarehouseInstance.Tuple incrementalDump;
+ WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+ replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName)
+ .verifyResult(bootStrapDump.lastReplicationId);
+
+ insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+ incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+ truncateTable(primaryDbName, tableName);
+ replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+ verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableName,
+ "select count(*) from " + tableName + "_nopart"),
+ Lists.newArrayList(result, result));
+
+ insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+ incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+ truncateTable(primaryDbName, tableNameMM);
+ replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+ verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableNameMM,
+ "select count(*) from " + tableNameMM + "_nopart"),
+ Lists.newArrayList(result, result));
+ }
+
+ @Test
+ public void testMultiDBTxn() throws Throwable {
+ String tableName = testName.getMethodName();
+ String dbName1 = tableName + "_db1";
+ String dbName2 = tableName + "_db2";
+ String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+ String tableProperty = "'transactional'='true'";
+ String txnStrStart = "START TRANSACTION";
+ String txnStrCommit = "COMMIT";
+
+ WarehouseInstance.Tuple incrementalDump;
+ primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')");
+ WarehouseInstance.Tuple bootStrapDump = primary.dump("`*`", null);
+
+ primary.run("use " + primaryDbName)
+ .run("create database " + dbName1 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
+ .run("create database " + dbName2 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
+ .run("CREATE TABLE " + dbName1 + "." + tableName + " (key int, value int) PARTITIONED BY (load_date date) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("use " + dbName1)
+ .run("SHOW TABLES LIKE '" + tableName + "'")
+ .verifyResult(tableName)
+ .run("CREATE TABLE " + dbName2 + "." + tableName + " (key int, value int) PARTITIONED BY (load_date date) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("use " + dbName2)
+ .run("SHOW TABLES LIKE '" + tableName + "'")
+ .verifyResult(tableName)
+ .run(txnStrStart)
+ .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)")
+ .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)")
+ .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)")
+ .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)")
+ .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)")
+ .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)")
+ .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)")
+ .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)")
+ .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)")
+ .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)")
+ .run("select key from " + dbName2 + "." + tableName + " order by key")
+ .verifyResults(resultArray)
+ .run("select key from " + dbName1 + "." + tableName + " order by key")
+ .verifyResults(resultArray)
+ .run(txnStrCommit);
+
+ incrementalDump = primary.dump("`*`", bootStrapDump.lastReplicationId);
+
+ // Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
+ // we are not able to create multiple embedded derby instances for two different MetaStore instances.
+ primary.run("drop database " + primaryDbName + " cascade");
+ primary.run("drop database " + dbName1 + " cascade");
+ primary.run("drop database " + dbName2 + " cascade");
+ //End of additional steps
+
+ replica.loadWithoutExplain("", bootStrapDump.dumpLocation)
+ .run("REPL STATUS default")
+ .verifyResult(bootStrapDump.lastReplicationId);
+
+ replica.loadWithoutExplain("", incrementalDump.dumpLocation)
+ .run("REPL STATUS " + dbName1)
+ .run("select key from " + dbName1 + "." + tableName + " order by key")
+ .verifyResults(resultArray)
+ .run("select key from " + dbName2 + "." + tableName + " order by key")
+ .verifyResults(resultArray);
+
+ replica.run("drop database " + primaryDbName + " cascade");
+ replica.run("drop database " + dbName1 + " cascade");
+ replica.run("drop database " + dbName2 + " cascade");
+ }
+
+ private void verifyResultsInReplica(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+ for (int idx = 0; idx < selectStmtList.size(); idx++) {
+ replica.run("use " + replicatedDbName)
+ .run(selectStmtList.get(idx))
+ .verifyResults(expectedValues.get(idx));
+ }
+ }
+
+ private WarehouseInstance.Tuple verifyIncrementalLoad(List<String> selectStmtList,
+ List<String[]> expectedValues, String lastReplId) throws Throwable {
+ WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, lastReplId);
+ replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+ verifyResultsInReplica(selectStmtList, expectedValues);
+
+ replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+ .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+ verifyResultsInReplica(selectStmtList, expectedValues);
+ return incrementalDump;
+ }
+
+ private void deleteRecords(String tableName) throws Throwable {
+ primary.run("use " + primaryDbName)
+ .run("delete from " + tableName)
+ .run("select count(*) from " + tableName)
+ .verifyResult("0");
+ }
+
+ private void updateRecords(String tableName) throws Throwable {
+ primary.run("use " + primaryDbName)
+ .run("update " + tableName + " set value = 100 where key >= 2")
+ .run("select value from " + tableName + " order by value")
+ .verifyResults(new String[] {"1", "100", "100", "100", "100"});
+ }
+
+ private void truncateTable(String dbName, String tableName) throws Throwable {
+ primary.run("use " + dbName)
+ .run("truncate table " + tableName)
+ .run("select count(*) from " + tableName)
+ .verifyResult("0")
+ .run("truncate table " + tableName + "_nopart")
+ .run("select count(*) from " + tableName + "_nopart")
+ .verifyResult("0");
+ }
+
+ private WarehouseInstance.Tuple verifyLoad(String tableName, String tableNameOp, String lastReplId) throws Throwable {
+ String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+ if (tableNameOp == null) {
+ return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key",
+ "select key from " + tableName + "_nopart order by key"),
+ Lists.newArrayList(resultArray, resultArray), lastReplId);
+ }
+ return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key",
+ "select key from " + tableNameOp + " order by key",
+ "select key from " + tableName + "_nopart" + " order by key",
+ "select key from " + tableNameOp + "_nopart" + " order by key"),
+ Lists.newArrayList(resultArray, resultArray, resultArray, resultArray), lastReplId);
+ }
+
+ private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray, boolean isTxn)
+ throws Throwable {
+ String txnStrStart = "START TRANSACTION";
+ String txnStrCommit = "COMMIT";
+ if (!isTxn) {
+ txnStrStart = "use " + dbName; //dummy
+ txnStrCommit = "use " + dbName; //dummy
+ }
+ primary.run("use " + dbName);
+ primary.run("CREATE TABLE " + tableName + " (key int, value int) PARTITIONED BY (load_date date) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("SHOW TABLES LIKE '" + tableName + "'")
+ .verifyResult(tableName)
+ .run("CREATE TABLE " + tableName + "_nopart (key int, value int) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("SHOW TABLES LIKE '" + tableName + "_nopart'")
+ .run("ALTER TABLE " + tableName + " ADD PARTITION (load_date='2016-03-03')")
+ .run(txnStrStart)
+ .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)")
+ .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)")
+ .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)")
+ .run("INSERT INTO " + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)")
+ .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)")
+ .run("select key from " + tableName + " order by key")
+ .verifyResults(resultArray)
+ .run("INSERT INTO " + tableName + "_nopart (key, value) select key, value from " + tableName)
+ .run("select key from " + tableName + "_nopart" + " order by key")
+ .verifyResults(resultArray)
+ .run(txnStrCommit);
+ }
+
+ private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray)
+ throws Throwable {
+ insertIntoDB(dbName, tableName, tableProperty, resultArray, false);
+ }
+
+ private void insertRecords(String tableName, String tableNameOp, boolean isMMTable,
+ OperationType opType) throws Throwable {
+ insertRecordsIntoDB(primaryDbName, tableName, tableNameOp, isMMTable, opType);
+ }
+
+ private void insertRecordsIntoDB(String DbName, String tableName, String tableNameOp, boolean isMMTable,
+ OperationType opType) throws Throwable {
+ String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+ String tableProperty = "'transactional'='true'";
+ if (isMMTable) {
+ tableProperty = setMMtableProperty(tableProperty);
+ }
+ primary.run("use " + DbName);
+
+ switch (opType) {
+ case REPL_TEST_ACID_INSERT:
+ insertIntoDB(DbName, tableName, tableProperty, resultArray);
+ insertIntoDB(primaryDbNameExtra, tableName, tableProperty, resultArray);
+ return;
+ case REPL_TEST_ACID_INSERT_OVERWRITE:
+ primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )")
+ .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (2, 2)")
+ .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (10, 12)")
+ .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-02') VALUES (11, 1)")
+ .run("select key from " + tableNameOp + " order by key")
+ .verifyResults(new String[]{"2", "10", "11"})
+ .run("insert overwrite table " + tableNameOp + " select * from " + tableName)
+ .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )")
+ .run("INSERT INTO " + tableNameOp + "_nopart VALUES (2, 2)")
+ .run("INSERT INTO " + tableNameOp + "_nopart VALUES (10, 12)")
+ .run("INSERT INTO " + tableNameOp + "_nopart VALUES (11, 1)")
+ .run("select key from " + tableNameOp + "_nopart" + " order by key")
+ .verifyResults(new String[]{"2", "10", "11"})
+ .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName + "_nopart")
+ .run("select key from " + tableNameOp + "_nopart" + " order by key");
+ break;
+ case REPL_TEST_ACID_INSERT_SELECT:
+ primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )")
+ .run("insert into " + tableNameOp + " partition (load_date) select * from " + tableName)
+ .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )")
+ .run("insert into " + tableNameOp + "_nopart select * from " + tableName + "_nopart");
+ break;
+ case REPL_TEST_ACID_INSERT_IMPORT:
+ String path = "hdfs:///tmp/" + DbName + "/";
+ String exportPath = "'" + path + tableName + "/'";
+ String exportPathNoPart = "'" + path + tableName + "_nopart/'";
+ primary.run("export table " + tableName + " to " + exportPath)
+ .run("import table " + tableNameOp + " from " + exportPath)
+ .run("export table " + tableName + "_nopart to " + exportPathNoPart)
+ .run("import table " + tableNameOp + "_nopart from " + exportPathNoPart);
+ break;
+ case REPL_TEST_ACID_CTAS:
+ primary.run("create table " + tableNameOp + " as select * from " + tableName)
+ .run("create table " + tableNameOp + "_nopart as select * from " + tableName + "_nopart");
+ break;
+ case REPL_TEST_ACID_INSERT_LOADLOCAL:
+ primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("SHOW TABLES LIKE '" + tableNameOp + "'")
+ .verifyResult(tableNameOp)
+ .run("INSERT OVERWRITE LOCAL DIRECTORY './test.dat' SELECT a.* FROM " + tableName + " a")
+ .run("LOAD DATA LOCAL INPATH './test.dat' OVERWRITE INTO TABLE " + tableNameOp +
+ " PARTITION (load_date='2008-08-15')")
+ .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("SHOW TABLES LIKE '" + tableNameOp + "_nopart'")
+ .verifyResult(tableNameOp + "_nopart")
+ .run("LOAD DATA LOCAL INPATH './test.dat' OVERWRITE INTO TABLE " + tableNameOp + "_nopart");
+ break;
+ case REPL_TEST_ACID_INSERT_UNION:
+ primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("SHOW TABLES LIKE '" + tableNameOp + "'")
+ .verifyResult(tableNameOp)
+ .run("insert overwrite table " + tableNameOp + " partition (load_date) select * from " + tableName +
+ " union all select * from " + tableName)
+ .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+ "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+ .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName +
+ "_nopart union all select * from " + tableName + "_nopart");
+ resultArray = new String[]{"1", "2", "3", "4", "5", "1", "2", "3", "4", "5"};
+ break;
+ default:
+ return;
+ }
+ primary.run("select key from " + tableNameOp + " order by key").verifyResults(resultArray);
+ primary.run("select key from " + tableNameOp + "_nopart" + " order by key").verifyResults(resultArray);
+ }
+
+ private String setMMtableProperty(String tableProperty) throws Throwable {
+ return tableProperty.concat(", 'transactional_properties' = 'insert_only'");
+ }
+
+ private void insertForMerge(String tableName, String tableNameMerge, boolean isMMTable) throws Throwable {
+ String tableProperty = "'transactional'='true'";
+ if (isMMTable) {
+ tableProperty = setMMtableProperty(tableProperty);
+ }
+ primary.run("use " + primaryDbName)
+ .run("CREATE TABLE " + tableName + "( ID int, TranValue string, last_update_user string) PARTITIONED BY " +
+ "(tran_date string) CLUSTERED BY (ID) into 5 buckets STORED AS ORC TBLPROPERTIES " +
+ " ( "+ tableProperty + " )")
+ .run("SHOW TABLES LIKE '" + tableName + "'")
+ .verifyResult(tableName)
+ .run("CREATE TABLE " + tableNameMerge + " ( ID int, TranValue string, tran_date string) STORED AS ORC ")
+ .run("SHOW TABLES LIKE '" + tableNameMerge + "'")
+ .verifyResult(tableNameMerge)
+ .run("INSERT INTO " + tableName + " PARTITION (tran_date) VALUES (1, 'value_01', 'creation', '20170410')," +
+ " (2, 'value_02', 'creation', '20170410'), (3, 'value_03', 'creation', '20170410'), " +
+ " (4, 'value_04', 'creation', '20170410'), (5, 'value_05', 'creation', '20170413'), " +
+ " (6, 'value_06', 'creation', '20170413'), (7, 'value_07', 'creation', '20170413'), " +
+ " (8, 'value_08', 'creation', '20170413'), (9, 'value_09', 'creation', '20170413'), " +
+ " (10, 'value_10','creation', '20170413')")
+ .run("select ID from " + tableName + " order by ID")
+ .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"})
+ .run("INSERT INTO " + tableNameMerge + " VALUES (1, 'value_01', '20170410'), " +
+ " (4, NULL, '20170410'), (7, 'value_77777', '20170413'), " +
+ " (8, NULL, '20170413'), (8, 'value_08', '20170415'), " +
+ "(11, 'value_11', '20170415')")
+ .run("select ID from " + tableNameMerge + " order by ID")
+ .verifyResults(new String[] {"1", "4", "7", "8", "8", "11"})
+ .run("MERGE INTO " + tableName + " AS T USING " + tableNameMerge + " AS S ON T.ID = S.ID and" +
+ " T.tran_date = S.tran_date WHEN MATCHED AND (T.TranValue != S.TranValue AND S.TranValue " +
+ " IS NOT NULL) THEN UPDATE SET TranValue = S.TranValue, last_update_user = " +
+ " 'merge_update' WHEN MATCHED AND S.TranValue IS NULL THEN DELETE WHEN NOT MATCHED " +
+ " THEN INSERT VALUES (S.ID, S.TranValue,'merge_insert', S.tran_date)")
+ .run("select last_update_user from " + tableName + " order by last_update_user")
+ .verifyResults(new String[] {"creation", "creation", "creation", "creation", "creation",
+ "creation", "creation", "merge_update", "merge_insert", "merge_insert"});
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index ff7f9bc..16c124c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -321,8 +321,7 @@ public class TestReplicationScenariosAcrossInstances {
"clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
.run("create table table1 (i int, j int)")
.run("insert into table1 values (1,2)")
- .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
- "'hive.repl.dump.include.acid.tables'='true'"));
+ .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, tuple.dumpLocation)
.run("use " + replicatedDbName)
@@ -341,8 +340,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("create table table2 (a int, city string) partitioned by (country string)")
.run("create table table3 (i int, j int)")
.run("insert into table1 values (1,2)")
- .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
- "'hive.repl.dump.include.acid.tables'='true'"));
+ .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
.run("use " + replicatedDbName)
@@ -467,8 +465,7 @@ public class TestReplicationScenariosAcrossInstances {
SOURCE_OF_REPLICATION + "' = '1,2,3')")
.run("use " + dbTwo)
.run("create table t1 (i int, j int)")
- .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
- "'hive.repl.dump.include.acid.tables'='true'"));
+ .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
/*
Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
@@ -527,8 +524,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("use " + dbOne)
.run("create table t1 (i int, j int) partitioned by (load_date date) "
+ "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
- .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
- "'hive.repl.dump.include.acid.tables'='true'"));
+ .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
String dbTwo = primaryDbName + randomTwo;
WarehouseInstance.Tuple incrementalTuple = primary
@@ -539,8 +535,7 @@ public class TestReplicationScenariosAcrossInstances {
.run("use " + dbOne)
.run("create table t2 (a int, b int)")
.dump("`*`", bootstrapTuple.lastReplicationId,
- Arrays.asList("'hive.repl.dump.metadata.only'='true'",
- "'hive.repl.dump.include.acid.tables'='true'"));
+ Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
/*
Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index f666df1..1e3478d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -249,6 +249,11 @@ public class WarehouseInstance implements Closeable {
return this;
}
+ WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocation) throws Throwable {
+ run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+ return this;
+ }
+
WarehouseInstance load(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
throws Throwable {
String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
index f87a6aa..2ba6d07 100644
--- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
import org.apache.thrift.TException;
@@ -109,6 +110,10 @@ public final class SynchronizedMetaStoreClient {
return client.fireListenerEvent(rqst);
}
+ public synchronized void addWriteNotificationLog(WriteNotificationLogRequest rqst) throws TException {
+ client.addWriteNotificationLog(rqst);
+ }
+
public synchronized void close() {
client.close();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 19097f5..bf7749d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -139,7 +139,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
deletePath = createTargetPath(targetPath, tgtFs);
}
- Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false);
+ //For acid table incremental replication, just copy the content of staging directory to destination.
+ //No need to clean it.
+ if (work.isNeedCleanTarget()) {
+ Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false);
+ }
// Set isManaged to false as this is not load data operation for which it is needed.
if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false)) {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 3a7f1bc..d095de6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -151,10 +151,11 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
continue;
}
String destFileName = srcFile.getCmPath().getName();
- Path destFile = new Path(toPath, destFileName);
+ Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
+ Path destFile = new Path(destRoot, destFileName);
if (dstFs.exists(destFile)) {
String destFileWithSourceName = srcFile.getSourcePath().getName();
- Path newDestFile = new Path(toPath, destFileWithSourceName);
+ Path newDestFile = new Path(destRoot, destFileWithSourceName);
boolean result = dstFs.rename(destFile, newDestFile);
if (!result) {
throw new IllegalStateException(
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index 5bbc25a..c2953c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
@@ -60,8 +62,19 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
return 0;
}
} catch (InvalidTableException e) {
- LOG.info("Table does not exist so, ignoring the operation as it might be a retry(idempotent) case.");
- return 0;
+ // In scenarios like import to mm tables, the alloc write id event is generated before create table event.
+ try {
+ Database database = Hive.get().getDatabase(work.getDbName());
+ if (!replicationSpec.allowReplacementInto(database.getParameters())) {
+ // if the event is already replayed, then no need to replay it again.
+ LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " +
+ replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType());
+ return 0;
+ }
+ } catch (HiveException e1) {
+ LOG.error("Get database failed with exception " + e1.getMessage());
+ return 1;
+ }
} catch (HiveException e) {
LOG.error("Get table failed with exception " + e.getMessage());
return 1;
@@ -85,10 +98,16 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
}
return 0;
case REPL_COMMIT_TXN:
- for (long txnId : work.getTxnIds()) {
- txnManager.replCommitTxn(replPolicy, txnId);
- LOG.info("Replayed CommitTxn Event for policy " + replPolicy + " with srcTxn " + txnId);
- }
+ // Currently only one commit txn per event is supported.
+ assert (work.getTxnIds().size() == 1);
+
+ long txnId = work.getTxnIds().get(0);
+ CommitTxnRequest commitTxnRequest = new CommitTxnRequest(txnId);
+ commitTxnRequest.setReplPolicy(work.getReplPolicy());
+ commitTxnRequest.setWriteEventInfos(work.getWriteEventInfos());
+ txnManager.replCommitTxn(commitTxnRequest);
+ LOG.info("Replayed CommitTxn Event for replPolicy: " + replPolicy + " with srcTxn: " + txnId +
+ "WriteEventInfos: " + work.getWriteEventInfos());
return 0;
case REPL_ALLOC_WRITE_ID:
assert work.getTxnToWriteIdList() != null;
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index e48657c..82ecad1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -199,7 +199,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
cmRoot,
getHive(),
conf,
- getNewEventOnlyReplicationSpec(ev.getEventId())
+ getNewEventOnlyReplicationSpec(ev.getEventId()),
+ work.dbNameOrPattern,
+ work.tableNameOrPattern
);
EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
eventHandler.handle(context);