You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/04/17 07:24:50 UTC
[12/12] hive git commit: HIVE-19089: Create/Replicate Allocate
write-id event (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
HIVE-19089: Create/Replicate Allocate write-id event (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fa9e743e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fa9e743e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fa9e743e
Branch: refs/heads/master
Commit: fa9e743e7afbcd6409a51955e39e4e2bb3c109d2
Parents: 7fb088b
Author: Sankar Hariappan <sa...@apache.org>
Authored: Tue Apr 17 12:54:16 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Tue Apr 17 12:54:16 2018 +0530
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 46 +-
.../listener/TestDbNotificationListener.java | 5 +
.../TestReplicationScenariosAcidTables.java | 25 +-
.../apache/hadoop/hive/ql/exec/ReplTxnTask.java | 40 +-
.../apache/hadoop/hive/ql/exec/ReplTxnWork.java | 55 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 9 +
.../hadoop/hive/ql/lockmgr/DummyTxnManager.java | 6 +
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 12 +
.../hadoop/hive/ql/metadata/HiveUtils.java | 10 +
.../ql/parse/ReplicationSemanticAnalyzer.java | 53 +-
.../hadoop/hive/ql/parse/repl/DumpType.java | 7 +
.../repl/dump/events/AllocWriteIdHandler.java | 41 +
.../repl/dump/events/EventHandlerFactory.java | 1 +
.../repl/load/message/AbortTxnHandler.java | 4 +-
.../repl/load/message/AllocWriteIdHandler.java | 64 +
.../repl/load/message/CommitTxnHandler.java | 7 +-
.../parse/repl/load/message/OpenTxnHandler.java | 4 +-
.../metastore/txn/TestCompactionTxnHandler.java | 5 +-
.../hive/metastore/txn/TestTxnHandler.java | 115 +-
.../hive/ql/lockmgr/TestDbTxnManager2.java | 21 +-
.../hive/ql/txn/compactor/CompactorTest.java | 6 +-
.../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp | 2258 ++++++-------
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3016 +++++++++---------
.../gen/thrift/gen-cpp/hive_metastore_types.h | 34 +-
.../metastore/api/AddDynamicPartitions.java | 32 +-
.../api/AllocateTableWriteIdsRequest.java | 519 ++-
.../api/AllocateTableWriteIdsResponse.java | 36 +-
.../metastore/api/ClearFileMetadataRequest.java | 32 +-
.../hive/metastore/api/ClientCapabilities.java | 32 +-
.../hive/metastore/api/CompactionRequest.java | 44 +-
.../hive/metastore/api/CreationMetadata.java | 32 +-
.../metastore/api/FindSchemasByColsResp.java | 36 +-
.../hive/metastore/api/FireEventRequest.java | 32 +-
.../metastore/api/GetAllFunctionsResponse.java | 36 +-
.../api/GetFileMetadataByExprRequest.java | 32 +-
.../api/GetFileMetadataByExprResult.java | 48 +-
.../metastore/api/GetFileMetadataRequest.java | 32 +-
.../metastore/api/GetFileMetadataResult.java | 44 +-
.../hive/metastore/api/GetTablesRequest.java | 32 +-
.../hive/metastore/api/GetTablesResult.java | 36 +-
.../api/HeartbeatTxnRangeResponse.java | 64 +-
.../metastore/api/InsertEventRequestData.java | 64 +-
.../hadoop/hive/metastore/api/LockRequest.java | 36 +-
.../hive/metastore/api/Materialization.java | 32 +-
.../api/NotificationEventResponse.java | 36 +-
.../metastore/api/PutFileMetadataRequest.java | 64 +-
.../hive/metastore/api/SchemaVersion.java | 36 +-
.../hive/metastore/api/ShowCompactResponse.java | 36 +-
.../hive/metastore/api/ShowLocksResponse.java | 36 +-
.../hive/metastore/api/ThriftHiveMetastore.java | 2476 +++++++-------
.../hive/metastore/api/WMFullResourcePlan.java | 144 +-
.../api/WMGetAllResourcePlanResponse.java | 36 +-
.../WMGetTriggersForResourePlanResponse.java | 36 +-
.../api/WMValidateResourcePlanResponse.java | 64 +-
.../gen-php/metastore/ThriftHiveMetastore.php | 1394 ++++----
.../src/gen/thrift/gen-php/metastore/Types.php | 842 ++---
.../hive_metastore/ThriftHiveMetastore.py | 940 +++---
.../gen/thrift/gen-py/hive_metastore/ttypes.py | 543 ++--
.../gen/thrift/gen-rb/hive_metastore_types.rb | 15 +-
.../hadoop/hive/metastore/HiveMetaStore.java | 11 +-
.../hive/metastore/HiveMetaStoreClient.java | 20 +-
.../hadoop/hive/metastore/IMetaStoreClient.java | 13 +-
.../hive/metastore/MetaStoreEventListener.java | 34 +-
.../metastore/MetaStoreListenerNotifier.java | 85 +-
.../hive/metastore/events/AbortTxnEvent.java | 32 -
.../metastore/events/AllocWriteIdEvent.java | 57 +
.../hive/metastore/events/CommitTxnEvent.java | 32 -
.../hive/metastore/events/ListenerEvent.java | 16 -
.../hive/metastore/events/OpenTxnEvent.java | 32 -
.../messaging/AllocWriteIdMessage.java | 36 +
.../hive/metastore/messaging/EventMessage.java | 3 +-
.../messaging/MessageDeserializer.java | 5 +
.../metastore/messaging/MessageFactory.java | 13 +
.../event/filters/DatabaseAndTableFilter.java | 3 +-
.../messaging/json/JSONAllocWriteIdMessage.java | 111 +
.../messaging/json/JSONMessageDeserializer.java | 9 +
.../messaging/json/JSONMessageFactory.java | 7 +
.../hadoop/hive/metastore/txn/TxnHandler.java | 166 +-
.../src/main/thrift/hive_metastore.thrift | 11 +-
.../HiveMetaStoreClientPreCatalog.java | 19 +-
80 files changed, 7789 insertions(+), 6719 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 7f21573..59d1e3a 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -18,6 +18,7 @@
package org.apache.hive.hcatalog.listener;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
@@ -449,7 +451,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
}
@Override
- public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException {
+ public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException {
int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1;
OpenTxnMessage msg = msgFactory.buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
openTxnEvent.getTxnIds().get(lastTxnIdx));
@@ -457,35 +459,37 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), msg.toString());
try {
- addNotificationLog(event, openTxnEvent);
+ addNotificationLog(event, openTxnEvent, dbConn, sqlGenerator);
} catch (SQLException e) {
throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
}
}
@Override
- public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws MetaException {
+ public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
+ throws MetaException {
NotificationEvent event =
new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), msgFactory.buildCommitTxnMessage(
commitTxnEvent.getTxnId())
.toString());
try {
- addNotificationLog(event, commitTxnEvent);
+ addNotificationLog(event, commitTxnEvent, dbConn, sqlGenerator);
} catch (SQLException e) {
throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
}
}
@Override
- public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
+ public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
+ throws MetaException {
NotificationEvent event =
new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), msgFactory.buildAbortTxnMessage(
abortTxnEvent.getTxnId())
.toString());
try {
- addNotificationLog(event, abortTxnEvent);
+ addNotificationLog(event, abortTxnEvent, dbConn, sqlGenerator);
} catch (SQLException e) {
throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
}
@@ -591,6 +595,27 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
process(event, dropConstraintEvent);
}
+ /***
+ * @param allocWriteIdEvent Alloc write id event
+ * @throws MetaException
+ */
+ @Override
+ public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent, Connection dbConn, SQLGenerator sqlGenerator)
+ throws MetaException {
+ String tableName = allocWriteIdEvent.getTableName();
+ String dbName = allocWriteIdEvent.getDbName();
+ NotificationEvent event =
+ new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(), msgFactory
+ .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName).toString());
+ event.setTableName(tableName);
+ event.setDbName(dbName);
+ try {
+ addNotificationLog(event, allocWriteIdEvent, dbConn, sqlGenerator);
+ } catch (SQLException e) {
+ throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
+ }
+ }
+
private int now() {
long millis = System.currentTimeMillis();
millis /= 1000;
@@ -606,9 +631,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
return "'" + input + "'";
}
- private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent)
- throws MetaException, SQLException {
- if ((listenerEvent.getConnection() == null) || (listenerEvent.getSqlGenerator() == null)) {
+ private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent, Connection dbConn,
+ SQLGenerator sqlGenerator) throws MetaException, SQLException {
+ if ((dbConn == null) || (sqlGenerator == null)) {
LOG.info("connection or sql generator is not set so executing sql via DN");
process(event, listenerEvent);
return;
@@ -616,8 +641,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
Statement stmt = null;
ResultSet rs = null;
try {
- stmt = listenerEvent.getConnection().createStatement();
- SQLGenerator sqlGenerator = listenerEvent.getSqlGenerator();
+ stmt = dbConn.createStatement();
event.setMessageFormat(msgFactory.getMessageFormat());
if (sqlGenerator.getDbProduct() == MYSQL) {
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 70c6a94..eef917e 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
@@ -233,6 +234,10 @@ public class TestDbNotificationListener {
public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
pushEventId(EventType.ABORT_TXN, abortTxnEvent);
}
+
+ public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws MetaException {
+ pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent);
+ }
}
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index cac9922..2ad83b6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -100,7 +100,8 @@ public class TestReplicationScenariosAcidTables {
.verifyResult(bootStrapDump.lastReplicationId);
// create table will start and coomit the transaction
- primary.run("CREATE TABLE " + tableName +
+ primary.run("use " + primaryDbName)
+ .run("CREATE TABLE " + tableName +
" (key int, value int) PARTITIONED BY (load_date date) " +
"CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')")
.run("SHOW TABLES LIKE '" + tableName + "'")
@@ -113,16 +114,12 @@ public class TestReplicationScenariosAcidTables {
primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verifyResult(incrementalDump.lastReplicationId)
- .run("SHOW TABLES LIKE '" + tableName + "'")
- .verifyResult(tableName);
+ .verifyResult(incrementalDump.lastReplicationId);
// Test the idempotent behavior of Open and Commit Txn
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verifyResult(incrementalDump.lastReplicationId)
- .run("SHOW TABLES LIKE '" + tableName + "'")
- .verifyResult(tableName);
+ .verifyResult(incrementalDump.lastReplicationId);
}
@Test
@@ -135,7 +132,8 @@ public class TestReplicationScenariosAcidTables {
.verifyResult(bootStrapDump.lastReplicationId);
// this should fail
- primary.runFailure("CREATE TABLE " + tableNameFail +
+ primary.run("use " + primaryDbName)
+ .runFailure("CREATE TABLE " + tableNameFail +
" (key int, value int) PARTITIONED BY (load_date date) " +
"CLUSTERED BY(key) ('transactional'='true')")
.run("SHOW TABLES LIKE '" + tableNameFail + "'")
@@ -145,16 +143,12 @@ public class TestReplicationScenariosAcidTables {
primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verifyResult(incrementalDump.lastReplicationId)
- .run("SHOW TABLES LIKE '" + tableNameFail + "'")
- .verifyFailure(new String[]{tableNameFail});
+ .verifyResult(incrementalDump.lastReplicationId);
// Test the idempotent behavior of Abort Txn
replica.load(replicatedDbName, incrementalDump.dumpLocation)
.run("REPL STATUS " + replicatedDbName)
- .verifyResult(incrementalDump.lastReplicationId)
- .run("SHOW TABLES LIKE '" + tableNameFail + "'")
- .verifyFailure(new String[]{tableNameFail});
+ .verifyResult(incrementalDump.lastReplicationId);
}
@Test
@@ -165,7 +159,8 @@ public class TestReplicationScenariosAcidTables {
.run("REPL STATUS " + replicatedDbName)
.verifyResult(bootStrapDump.lastReplicationId);
- primary.run("CREATE TABLE " + tableName +
+ primary.run("use " + primaryDbName)
+ .run("CREATE TABLE " + tableName +
" (key int, value int) PARTITIONED BY (load_date date) " +
"CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')")
.run("SHOW TABLES LIKE '" + tableName + "'")
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index 1cdeeb6..2615072 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -18,8 +18,14 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.api.StageType;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -44,11 +50,32 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
Utilities.FILE_OP_LOGGER.trace("Executing ReplTxnTask " + work.getOperationType().toString() +
" for txn ids : " + work.getTxnIds().toString() + " replPolicy : " + replPolicy);
}
+
+ String tableName = work.getTableName() == null || work.getTableName().isEmpty() ? null : work.getTableName();
+ if (tableName != null) {
+ Table tbl;
+ try {
+ tbl = Hive.get().getTable(work.getDbName(), tableName);
+ ReplicationSpec replicationSpec = work.getReplicationSpec();
+ if (replicationSpec != null && !replicationSpec.allowReplacementInto(tbl.getParameters())) {
+ // if the event is already replayed, then no need to replay it again.
+ LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " +
+ replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType());
+ return 0;
+ }
+ } catch (InvalidTableException e) {
+ LOG.info("Table does not exist so, ignoring the operation as it might be a retry(idempotent) case.");
+ return 0;
+ } catch (HiveException e) {
+ LOG.error("Get table failed with exception " + e.getMessage());
+ return 1;
+ }
+ }
+
try {
HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager();
String user = UserGroupInformation.getCurrentUser().getUserName();
- LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " +
- replPolicy + " with srcTxn " + work.getTxnIds().toString());
+ LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " + replPolicy);
switch(work.getOperationType()) {
case REPL_OPEN_TXN:
List<Long> txnIds = txnManager.replOpenTxn(replPolicy, work.getTxnIds(), user);
@@ -68,6 +95,15 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
LOG.info("Replayed CommitTxn Event for policy " + replPolicy + " with srcTxn " + txnId);
}
return 0;
+ case REPL_ALLOC_WRITE_ID:
+ assert work.getTxnToWriteIdList() != null;
+ String dbName = work.getDbName();
+ String tblName = work.getTableName();
+ List <TxnToWriteId> txnToWriteIdList = work.getTxnToWriteIdList();
+ txnManager.replAllocateTableWriteIdsBatch(dbName, tblName, replPolicy, txnToWriteIdList);
+ LOG.info("Replayed alloc write Id Event for repl policy: " + replPolicy + " db Name : " + dbName +
+ " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tblName);
+ return 0;
default:
LOG.error("Operation Type " + work.getOperationType() + " is not supported ");
return 1;
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
index 9467415..530e9be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
-import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import java.util.Collections;
import java.util.List;
/**
@@ -33,38 +35,49 @@ public class ReplTxnWork implements Serializable {
private static final long serialVersionUID = 1L;
private String dbName;
private String tableName;
+ private String replPolicy;
private List<Long> txnIds;
+ private List<TxnToWriteId> txnToWriteIdList;
+ private ReplicationSpec replicationSpec;
/**
* OperationType.
* Different kind of events supported for replaying.
*/
public enum OperationType {
- REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN
+ REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID
}
OperationType operation;
- public ReplTxnWork(String dbName, String tableName, List<Long> txnIds, OperationType type) {
+ public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
+ List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
this.txnIds = txnIds;
this.dbName = dbName;
this.tableName = tableName;
this.operation = type;
+ this.replPolicy = replPolicy;
+ this.txnToWriteIdList = txnToWriteIdList;
+ this.replicationSpec = replicationSpec;
}
- public ReplTxnWork(String dbName, String tableName, Long txnId, OperationType type) {
- this.txnIds = Lists.newArrayList(txnId);
- this.dbName = dbName;
- this.tableName = tableName;
- this.operation = type;
+ public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
+ ReplicationSpec replicationSpec) {
+ this(replPolicy, dbName, tableName, txnIds, type, null, replicationSpec);
}
- public List<Long> getTxnIds() {
- return txnIds;
+ public ReplTxnWork(String replPolicy, String dbName, String tableName, Long txnId,
+ OperationType type, ReplicationSpec replicationSpec) {
+ this(replPolicy, dbName, tableName, Collections.singletonList(txnId), type, null, replicationSpec);
}
- public Long getTxnId(int idx) {
- return txnIds.get(idx);
+ public ReplTxnWork(String replPolicy, String dbName, String tableName, OperationType type,
+ List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
+ this(replPolicy, dbName, tableName, null, type, txnToWriteIdList, replicationSpec);
+ }
+
+ public List<Long> getTxnIds() {
+ return txnIds;
}
public String getDbName() {
@@ -75,17 +88,19 @@ public class ReplTxnWork implements Serializable {
return tableName;
}
- public String getReplPolicy() {
- if ((dbName == null) || (dbName.isEmpty())) {
- return null;
- } else if ((tableName == null) || (tableName.isEmpty())) {
- return dbName.toLowerCase() + ".*";
- } else {
- return dbName.toLowerCase() + "." + tableName.toLowerCase();
- }
+ public String getReplPolicy() {
+ return replPolicy;
}
public OperationType getOperationType() {
return operation;
}
+
+ public List<TxnToWriteId> getTxnToWriteIdList() {
+ return txnToWriteIdList;
+ }
+
+ public ReplicationSpec getReplicationSpec() {
+ return replicationSpec;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index f566842..7b7fd5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -1013,6 +1013,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
}
}
+
+ public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
+ List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
+ try {
+ getMS().replAllocateTableWriteIdsBatch(dbName, tableName, replPolicy, srcTxnToWriteIdList);
+ } catch (TException e) {
+ throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+ }
+ }
private static long getHeartbeatInterval(Configuration conf) throws LockException {
// Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS),
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 9057bb9..78eedd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -77,6 +78,11 @@ class DummyTxnManager extends HiveTxnManagerImpl {
return 0L;
}
@Override
+ public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
+ List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
+ return;
+ }
+ @Override
public HiveLockManager getLockManager() throws LockException {
if (lockMgr == null) {
boolean supportConcurrency =
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index e0e235b..ec11fec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -265,6 +266,17 @@ public interface HiveTxnManager {
long getTableWriteId(String dbName, String tableName) throws LockException;
/**
+ * Allocates write id for each transaction in the list.
+ * @param dbName database name
+ * @param tableName the name of the table to allocate the write id
+ * @param replPolicy used by replication task to identify the source cluster
+ * @param srcTxnToWriteIdList List of txn id to write id Map
+ * @throws LockException
+ */
+ void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
+ List<TxnToWriteId> srcTxnToWriteIdList) throws LockException;
+
+ /**
* Should be though of more as a unique write operation ID in a given txn (at QueryPlan level).
* Each statement writing data within a multi statement txn should have a unique WriteId.
* Even a single statement, (e.g. Merge, multi-insert may generates several writes).
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
index dae18fb..f1c4d98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
@@ -428,4 +428,14 @@ public final class HiveUtils {
return null;
}
+
+ public static String getReplPolicy(String dbName, String tableName) {
+ if ((dbName == null) || (dbName.isEmpty())) {
+ return null;
+ } else if ((tableName == null) || (tableName.isEmpty())) {
+ return dbName.toLowerCase() + ".*";
+ } else {
+ return dbName.toLowerCase() + "." + tableName.toLowerCase();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index f7c78e2..1fda478 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -244,16 +244,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
}
}
- private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException {
- // This functions filters out all the events which are already replayed. This can be done only
- // for transaction related events as for other kind of events we can not gurantee that the last
- // repl id stored in the database/table is valid.
- if ((dumpType != DumpType.EVENT_ABORT_TXN) &&
- (dumpType != DumpType.EVENT_OPEN_TXN) &&
- (dumpType != DumpType.EVENT_COMMIT_TXN)) {
- return true;
+ private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
+ if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+ String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+ if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
+ LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
+ + " is already replayed. LastReplId - " + Long.parseLong(replLastId));
+ return false;
+ }
}
+ return true;
+ }
+ private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException {
// if database itself is null then we can not filter out anything.
if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) {
return true;
@@ -261,41 +264,23 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
Database database;
try {
database = Hive.get().getDatabase(dbNameOrPattern);
+ return isEventNotReplayed(database.getParameters(), dir, dumpType);
} catch (HiveException e) {
- LOG.error("failed to get the database " + dbNameOrPattern);
- throw new SemanticException(e);
- }
- String replLastId;
- Map<String, String> params = database.getParameters();
- if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
- replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
- if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
- LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
- + " is already replayed. LastReplId - " + Long.parseLong(replLastId));
- return false;
- }
+ //may be the db is getting created in this load
+ LOG.debug("failed to get the database " + dbNameOrPattern);
+ return true;
}
} else {
Table tbl;
try {
tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern);
+ return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
} catch (HiveException e) {
- LOG.error("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern);
- throw new SemanticException(e);
- }
- if (tbl != null) {
- Map<String, String> params = tbl.getParameters();
- if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
- String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
- if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
- LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
- + " is already replayed. LastReplId - " + Long.parseLong(replLastId));
- return false;
- }
- }
+ // may be the table is getting created in this load
+ LOG.debug("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern);
+ return true;
}
}
- return true;
}
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index 5fab0d2..2e42267 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncateTableHandler;
import org.apache.hadoop.hive.ql.parse.repl.load.message.OpenTxnHandler;
import org.apache.hadoop.hive.ql.parse.repl.load.message.CommitTxnHandler;
import org.apache.hadoop.hive.ql.parse.repl.load.message.AbortTxnHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.AllocWriteIdHandler;
public enum DumpType {
@@ -204,6 +205,12 @@ public enum DumpType {
public MessageHandler handler() {
return new AbortTxnHandler();
}
+ },
+ EVENT_ALLOC_WRITE_ID("EVENT_ALLOC_WRITE_ID") {
+ @Override
+ public MessageHandler handler() {
+ return new AllocWriteIdHandler();
+ }
};
String type = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
new file mode 100644
index 0000000..38efbd7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class AllocWriteIdHandler extends AbstractEventHandler {
+ AllocWriteIdHandler(NotificationEvent event) {
+ super(event);
+ }
+
+ @Override
+ public void handle(Context withinContext) throws Exception {
+ LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), event.getMessage());
+ DumpMetaData dmd = withinContext.createDmd(this);
+ dmd.setPayload(event.getMessage());
+ dmd.write();
+ }
+
+ @Override
+ public DumpType dumpType() {
+ return DumpType.EVENT_ALLOC_WRITE_ID;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
index 10ff21c..a1d61f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -53,6 +53,7 @@ public class EventHandlerFactory {
register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class);
register(MessageFactory.COMMIT_TXN_EVENT, CommitTxnHandler.class);
register(MessageFactory.ABORT_TXN_EVENT, AbortTxnHandler.class);
+ register(MessageFactory.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class);
}
static void register(String event, Class<? extends EventHandler> handlerClazz) {
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
index 5b2c85b..3c28f84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import java.io.Serializable;
import java.util.Collections;
@@ -43,7 +44,8 @@ public class AbortTxnHandler extends AbstractMessageHandler {
AbortTxnMessage msg = deserializer.getAbortTxnMessage(context.dmd.getPayload());
Task<ReplTxnWork> abortTxnTask = TaskFactory.get(
- new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN),
+ new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
+ msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec()),
context.hiveConf
);
updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
new file mode 100644
index 0000000..ef51396
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * AllocWriteIdHandler
+ * Target(Load) side handler for alloc write id event.
+ */
+public class AllocWriteIdHandler extends AbstractMessageHandler {
+ @Override
+ public List<Task<? extends Serializable>> handle(Context context)
+ throws SemanticException {
+ if (!AcidUtils.isAcidEnabled(context.hiveConf)) {
+ context.log.error("Cannot load alloc write id event as acid is not enabled");
+ throw new SemanticException("Cannot load alloc write id event as acid is not enabled");
+ }
+
+ AllocWriteIdMessage msg =
+ deserializer.getAllocWriteIdMessage(context.dmd.getPayload());
+
+ String dbName = (context.dbName != null && !context.dbName.isEmpty() ? context.dbName : msg.getDB());
+
+ // The context table name can be null if repl load is done on a full db.
+ // But we need table name for alloc write id and that is received from source.
+ String tableName = (context.tableName != null && !context.tableName.isEmpty() ? context.tableName : msg
+ .getTableName());
+
+ // Repl policy should be created based on the table name in context.
+ ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(dbName, context.tableName), dbName, tableName,
+ ReplTxnWork.OperationType.REPL_ALLOC_WRITE_ID, msg.getTxnToWriteIdList(), context.eventOnlyReplicationSpec());
+
+ Task<? extends Serializable> allocWriteIdTask = TaskFactory.get(work, context.hiveConf);
+ context.log.info("Added alloc write id task : {}", allocWriteIdTask.getId());
+ updatedMetadata.set(context.dmd.getEventTo().toString(), dbName, tableName, null);
+ return Collections.singletonList(allocWriteIdTask);
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index 461a0f1..1274213 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import java.io.Serializable;
import java.util.Collections;
@@ -41,10 +42,10 @@ public class CommitTxnHandler extends AbstractMessageHandler {
}
CommitTxnMessage msg = deserializer.getCommitTxnMessage(context.dmd.getPayload());
-
Task<ReplTxnWork> commitTxnTask = TaskFactory.get(
- new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(),
- ReplTxnWork.OperationType.REPL_COMMIT_TXN), context.hiveConf
+ new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
+ msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()),
+ context.hiveConf
);
updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
context.log.debug("Added Commit txn task : {}", commitTxnTask.getId());
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
index c6349ea..a502117 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import java.io.Serializable;
import java.util.Collections;
@@ -42,7 +43,8 @@ public class OpenTxnHandler extends AbstractMessageHandler {
OpenTxnMessage msg = deserializer.getOpenTxnMessage(context.dmd.getPayload());
Task<ReplTxnWork> openTxnTask = TaskFactory.get(
- new ReplTxnWork(context.dbName, context.tableName, msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN),
+ new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
+ msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN, context.eventOnlyReplicationSpec()),
context.hiveConf
);
updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 45890ed..f27a5b0 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -481,8 +481,9 @@ public class TestCompactionTxnHandler {
OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
long txnId = openTxns.getTxn_ids().get(0);
- AllocateTableWriteIdsResponse writeIds
- = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(openTxns.getTxn_ids(), dbName, tableName));
+ AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
+ rqst.setTxnIds(openTxns.getTxn_ids());
+ AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId());
assertEquals(1, writeId);
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index f1da15b..372c709 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.util.StringUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
@@ -156,8 +157,9 @@ public class TestTxnHandler {
List<String> parts = new ArrayList<String>();
parts.add("p=1");
- AllocateTableWriteIdsResponse writeIds
- = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(3L), "default", "T"));
+ AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest("default", "T");
+ rqst.setTxnIds(Collections.singletonList(3L));
+ AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
assertEquals(3, writeIds.getTxnToWriteIds().get(0).getTxnId());
assertEquals(1, writeId);
@@ -1544,24 +1546,117 @@ public class TestTxnHandler {
Assert.assertTrue("regex should be retryable", result);
}
- @Test
- public void testReplOpenTxn() throws Exception {
- int numTxn = 50000;
+ private List<Long> replOpenTxnForTest(long startId, int numTxn, String replPolicy)
+ throws Exception {
conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, numTxn);
+ long lastId = startId + numTxn - 1;
OpenTxnRequest rqst = new OpenTxnRequest(numTxn, "me", "localhost");
- rqst.setReplPolicy("default.*");
- rqst.setReplSrcTxnIds(LongStream.rangeClosed(1, numTxn)
+ rqst.setReplPolicy(replPolicy);
+ rqst.setReplSrcTxnIds(LongStream.rangeClosed(startId, lastId)
.boxed().collect(Collectors.toList()));
+
OpenTxnsResponse openedTxns = txnHandler.openTxns(rqst);
List<Long> txnList = openedTxns.getTxn_ids();
assertEquals(txnList.size(), numTxn);
- for (long i = 0; i < numTxn; i++) {
- long txnId = txnList.get((int) i);
- assertEquals(i+1, txnId);
+ int numTxnPresentNow = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXNS where TXN_ID >= " +
+ txnList.get(0) + " and TXN_ID <= " + txnList.get(numTxn - 1));
+ assertEquals(numTxn, numTxnPresentNow);
+
+ checkReplTxnForTest(startId, lastId, replPolicy, txnList);
+ return txnList;
+ }
+
+ private void replAbortTxnForTest(List<Long> txnList, String replPolicy)
+ throws Exception {
+ for (Long txnId : txnList) {
+ AbortTxnRequest rqst = new AbortTxnRequest(txnId);
+ rqst.setReplPolicy(replPolicy);
+ txnHandler.abortTxn(rqst);
}
+ checkReplTxnForTest(txnList.get(0), txnList.get(txnList.size() - 1), replPolicy, new ArrayList<>());
+ }
+
+ private void checkReplTxnForTest(Long startTxnId, Long endTxnId, String replPolicy, List<Long> targetTxnId)
+ throws Exception {
+ String[] output = TxnDbUtil.queryToString(conf, "select RTM_TARGET_TXN_ID from REPL_TXN_MAP where " +
+ " RTM_SRC_TXN_ID >= " + startTxnId + "and RTM_SRC_TXN_ID <= " + endTxnId +
+ " and RTM_REPL_POLICY = \'" + replPolicy + "\'").split("\n");
+ assertEquals(output.length - 1, targetTxnId.size());
+ for (int idx = 1; idx < output.length; idx++) {
+ long txnId = Long.parseLong(output[idx].trim());
+ assertEquals(txnId, targetTxnId.get(idx-1).longValue());
+ }
+ }
+
+ @Test
+ public void testReplOpenTxn() throws Exception {
+ int numTxn = 50000;
+ String[] output = TxnDbUtil.queryToString(conf, "select ntxn_next from NEXT_TXN_ID").split("\n");
+ long startTxnId = Long.parseLong(output[1].trim());
+ List<Long> txnList = replOpenTxnForTest(startTxnId, numTxn, "default.*");
+ assert(txnList.size() == numTxn);
txnHandler.abortTxns(new AbortTxnsRequest(txnList));
}
+ @Test
+ public void testReplAllocWriteId() throws Exception {
+ int numTxn = 2;
+ String[] output = TxnDbUtil.queryToString(conf, "select ntxn_next from NEXT_TXN_ID").split("\n");
+ long startTxnId = Long.parseLong(output[1].trim());
+ List<Long> srcTxnIdList = LongStream.rangeClosed(startTxnId, numTxn+startTxnId-1)
+ .boxed().collect(Collectors.toList());
+ List<Long> targetTxnList = replOpenTxnForTest(startTxnId, numTxn, "destdb.*");
+ assert(targetTxnList.size() == numTxn);
+
+ List<TxnToWriteId> srcTxnToWriteId;
+ List<TxnToWriteId> targetTxnToWriteId;
+ srcTxnToWriteId = new ArrayList<>();
+
+ for (int idx = 0; idx < numTxn; idx++) {
+ srcTxnToWriteId.add(new TxnToWriteId(startTxnId+idx, idx+1));
+ }
+ AllocateTableWriteIdsRequest allocMsg = new AllocateTableWriteIdsRequest("destdb", "tbl1");
+ allocMsg.setReplPolicy("destdb.*");
+ allocMsg.setSrcTxnToWriteIdList(srcTxnToWriteId);
+ targetTxnToWriteId = txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+ for (int idx = 0; idx < targetTxnList.size(); idx++) {
+ assertEquals(targetTxnToWriteId.get(idx).getWriteId(), srcTxnToWriteId.get(idx).getWriteId());
+ assertEquals(Long.valueOf(targetTxnToWriteId.get(idx).getTxnId()), targetTxnList.get(idx));
+ }
+
+ // idempotent case for destdb db
+ targetTxnToWriteId = txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+ for (int idx = 0; idx < targetTxnList.size(); idx++) {
+ assertEquals(targetTxnToWriteId.get(idx).getWriteId(), srcTxnToWriteId.get(idx).getWriteId());
+ assertEquals(Long.valueOf(targetTxnToWriteId.get(idx).getTxnId()), targetTxnList.get(idx));
+ }
+
+ //invalid case
+ boolean failed = false;
+ srcTxnToWriteId = new ArrayList<>();
+ srcTxnToWriteId.add(new TxnToWriteId(startTxnId, 2*numTxn+1));
+ allocMsg = new AllocateTableWriteIdsRequest("destdb", "tbl2");
+ allocMsg.setReplPolicy("destdb.*");
+ allocMsg.setSrcTxnToWriteIdList(srcTxnToWriteId);
+ try {
+ txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+ } catch (IllegalStateException e) {
+ failed = true;
+ }
+ assertTrue(failed);
+
+ replAbortTxnForTest(srcTxnIdList, "destdb.*");
+
+ // Test for aborted transactions
+ failed = false;
+ try {
+ txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+ } catch (RuntimeException e) {
+ failed = true;
+ }
+ assertTrue(failed);
+ }
+
private void updateTxns(Connection conn) throws SQLException {
Statement stmt = conn.createStatement();
stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 8406caa..0926663 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -1031,9 +1031,9 @@ public class TestDbTxnManager2 {
//update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET"));
- AllocateTableWriteIdsResponse writeIds
- = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr2.getCurrentTxnId()),
- "default", "tab2"));
+ AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest("default", "tab2");
+ rqst.setTxnIds(Collections.singletonList(txnMgr2.getCurrentTxnId()));
+ AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
Assert.assertEquals(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getTxnId());
AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getWriteId(),
@@ -1054,8 +1054,9 @@ public class TestDbTxnManager2 {
//update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET"));
- writeIds = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr2.getCurrentTxnId()),
- "default", "tab2"));
+ rqst = new AllocateTableWriteIdsRequest("default", "tab2");
+ rqst.setTxnIds(Collections.singletonList(txnMgr2.getCurrentTxnId()));
+ writeIds = txnHandler.allocateTableWriteIds(rqst);
Assert.assertEquals(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getTxnId());
adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getWriteId(),
@@ -1074,8 +1075,9 @@ public class TestDbTxnManager2 {
checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1", true));//no rows match
txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
- writeIds = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr.getCurrentTxnId()),
- "default", "tab2"));
+ rqst = new AllocateTableWriteIdsRequest("default", "tab2");
+ rqst.setTxnIds(Collections.singletonList(txnMgr.getCurrentTxnId()));
+ writeIds = txnHandler.allocateTableWriteIds(rqst);
Assert.assertEquals(txnMgr.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getTxnId());
//so generate empty Dyn Part call
@@ -1119,8 +1121,9 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks);
txnMgr.rollbackTxn();
- AllocateTableWriteIdsResponse writeIds
- = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnId), "default", "TAB_PART"));
+ AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest("default", "TAB_PART");
+ rqst.setTxnIds(Collections.singletonList(txnId));
+ AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
Assert.assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId());
AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeIds.getTxnToWriteIds().get(0).getWriteId(),
http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 083c671..124c97e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -202,7 +202,8 @@ public abstract class CompactorTest {
protected long allocateWriteId(String dbName, String tblName, long txnid)
throws MetaException, TxnAbortedException, NoSuchTxnException {
AllocateTableWriteIdsRequest awiRqst
- = new AllocateTableWriteIdsRequest(Collections.singletonList(txnid), dbName, tblName);
+ = new AllocateTableWriteIdsRequest(dbName, tblName);
+ awiRqst.setTxnIds(Collections.singletonList(txnid));
AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);
return awiResp.getTxnToWriteIds().get(0).getWriteId();
}
@@ -254,7 +255,8 @@ public abstract class CompactorTest {
protected void burnThroughTransactions(String dbName, String tblName, int num, Set<Long> open, Set<Long> aborted)
throws MetaException, NoSuchTxnException, TxnAbortedException {
OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost"));
- AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(rsp.getTxn_ids(), dbName, tblName);
+ AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName);
+ awiRqst.setTxnIds(rsp.getTxn_ids());
AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);
int i = 0;
for (long tid : rsp.getTxn_ids()) {