You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/24 11:43:14 UTC
[18/19] hive git commit: HIVE-19267: Replicate ACID/MM tables write
operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index a14802f..940e381 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -199,7 +199,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
cmRoot,
getHive(),
conf,
- getNewEventOnlyReplicationSpec(ev.getEventId())
+ getNewEventOnlyReplicationSpec(ev.getEventId()),
+ work.dbNameOrPattern,
+ work.tableNameOrPattern
);
EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
eventHandler.handle(context);
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 7fce67f..16ba82e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -71,21 +70,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-import java.io.Serializable;
import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
-
/**
* Utilities that are shared by all of the ACID input and output formats. They
@@ -1907,6 +1892,28 @@ public class AcidUtils {
return null;
}
+ //Get the first level acid directory (if any) from a given path
+ public static String getFirstLevelAcidDirPath(Path dataPath, FileSystem fileSystem) throws IOException {
+ if (dataPath == null) {
+ return null;
+ }
+ String firstLevelAcidDir = getAcidSubDir(dataPath);
+ if (firstLevelAcidDir != null) {
+ return firstLevelAcidDir;
+ }
+
+ String acidDirPath = getFirstLevelAcidDirPath(dataPath.getParent(), fileSystem);
+ if (acidDirPath == null) {
+ return null;
+ }
+
+ // We need the path for directory so no need to append file name
+ if (fileSystem.isDirectory(dataPath)) {
+ return acidDirPath + Path.SEPARATOR + dataPath.getName();
+ }
+ return acidDirPath;
+ }
+
public static boolean isAcidEnabled(HiveConf hiveConf) {
String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index bcc0508..ec8527e 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -26,30 +26,32 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
import org.apache.hadoop.hive.llap.io.api.LlapIo;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
@@ -62,8 +64,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
-import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -78,10 +78,7 @@ import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.Ref;
import org.apache.hive.common.util.ReflectionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* HiveInputFormat is a parameterized InputFormat which looks at the path name
@@ -460,8 +457,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
TableDesc table, List<InputSplit> result)
throws IOException {
+ String tableName = table.getTableName();
ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(
- conf, table.getTableName());
+ conf, tableName == null ? null : HiveStringUtils.normalizeIdentifier(tableName));
ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList);
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 4fd1d4e..78980fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.Context;
@@ -638,14 +639,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
@Override
- public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException {
+ public void replCommitTxn(CommitTxnRequest rqst) throws LockException {
try {
- getMS().replCommitTxn(srcTxnId, replPolicy);
+ getMS().replCommitTxn(rqst);
} catch (NoSuchTxnException e) {
- LOG.error("Metastore could not find " + JavaUtils.txnIdToString(srcTxnId));
- throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(srcTxnId));
+ LOG.error("Metastore could not find " + JavaUtils.txnIdToString(rqst.getTxnid()));
+ throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(rqst.getTxnid()));
} catch (TxnAbortedException e) {
- LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(srcTxnId), e.getMessage());
+ LockException le = new LockException(e, ErrorMsg.TXN_ABORTED,
+ JavaUtils.txnIdToString(rqst.getTxnid()), e.getMessage());
LOG.error(le.getMessage());
throw le;
} catch (TException e) {
@@ -1013,7 +1015,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
assert isTxnOpen();
return stmtId++;
}
-
+ @Override
+ public int getCurrentStmtId() {
+ assert isTxnOpen();
+ return stmtId;
+ }
@Override
public long getTableWriteId(String dbName, String tableName) throws LockException {
assert isTxnOpen();
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index ab9d67e..1feddeb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,10 @@ class DummyTxnManager extends HiveTxnManagerImpl {
return 0;
}
@Override
+ public int getCurrentStmtId() {
+ return 0;
+ }
+ @Override
public long getTableWriteId(String dbName, String tableName) throws LockException {
return 0L;
}
@@ -220,7 +225,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
}
@Override
- public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException {
+ public void replCommitTxn(CommitTxnRequest rqst) throws LockException {
// No-op
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 5f68e08..9575552 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.ql.Context;
@@ -61,11 +62,11 @@ public interface HiveTxnManager {
/**
* Commit the transaction in target cluster.
- * @param replPolicy Replication policy to uniquely identify the source cluster.
- * @param srcTxnId The id of the transaction at the source cluster
+ *
+ * @param rqst Commit transaction request having information related to commit txn and write events.
* @throws LockException in case of failure to commit the transaction.
*/
- void replCommitTxn(String replPolicy, long srcTxnId) throws LockException;
+ void replCommitTxn(CommitTxnRequest rqst) throws LockException;
/**
* Abort the transaction in target cluster.
@@ -295,6 +296,9 @@ public interface HiveTxnManager {
*/
int getStmtIdAndIncrement();
+ // Can be used by operation to set the stmt id when allocation is done somewhere else.
+ int getCurrentStmtId();
+
/**
* Acquire the materialization rebuild lock for a given view. We need to specify the fully
* qualified name of the materialized view and the open transaction ID so we can identify
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 2e05e15..953cd1d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -161,6 +161,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
@@ -1762,8 +1763,14 @@ public class Hive {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", "FileMoves");
- List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ List<Path> newFiles = null;
+ // If config is set, table is not temporary and partition being inserted exists, capture
+ // the list of files added. For not yet existing partitions (insert overwrite to new partition
+ // or dynamic partition inserts), the add partition event will capture the list of files added.
+ if (areEventsForDmlNeeded(tbl, oldPart)) {
+ newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ }
// Note: the stats for ACID tables do not have any coordination with either Hive ACID logic
// like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining
@@ -1776,8 +1783,8 @@ public class Hive {
Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
}
assert !isAcidIUDoperation;
- if (areEventsForDmlNeeded(tbl, oldPart)) {
- newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
+ if (newFiles != null) {
+ listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTableWrite ? isInsertOverwrite : false, newFiles);
}
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath
@@ -1826,8 +1833,15 @@ public class Hive {
// or dynamic partition inserts), the add partition event will capture the list of files added.
// Generate an insert event only if inserting into an existing partition
// When inserting into a new partition, the add partition event takes care of insert event
- if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
- fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
+ if ((null != oldPart) && (null != newFiles)) {
+ if (isTxnTable) {
+ addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+ } else {
+ fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
+ }
+ } else {
+ LOG.debug("No new files were created, and is not a replace, or we're inserting into a "
+ + "partition that does not exist yet. Skipping generating INSERT event.");
}
// column stats will be inaccurate
@@ -1897,6 +1911,12 @@ public class Hive {
}
throw e;
}
+
+ // For acid table, add the acid_write event with file list at the time of load itself. But
+ // it should be done after partition is created.
+ if (isTxnTable && (null != newFiles)) {
+ addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+ }
} else {
setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart);
}
@@ -1949,50 +1969,47 @@ public class Hive {
}
private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
- return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
+ // For Acid IUD, add partition is a meta data only operation. So need to add the new files added
+ // information into the TXN_WRITE_NOTIFICATION_LOG table.
+ return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() &&
+ ((null != oldPart) || AcidUtils.isTransactionalTable(tbl));
+ }
+
+ private void listFilesInsideAcidDirectory(Path acidDir, FileSystem srcFs, List<Path> newFiles) throws IOException {
+ // list out all the files/directory in the path
+ FileStatus[] acidFiles;
+ acidFiles = srcFs.listStatus(acidDir);
+ if (acidFiles == null) {
+ LOG.debug("No files added by this query in: " + acidDir);
+ return;
+ }
+ for (FileStatus acidFile : acidFiles) {
+ // need to list out only files, ignore folders.
+ if (!acidFile.isDirectory()) {
+ newFiles.add(acidFile.getPath());
+ } else {
+ listFilesInsideAcidDirectory(acidFile.getPath(), srcFs, newFiles);
+ }
+ }
}
- private List<Path> listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId) throws HiveException {
- List<Path> newFiles = new ArrayList<Path>();
- final String filePrefix = AcidUtils.deltaSubdir(writeId, writeId, stmtId);
- FileStatus[] srcs;
- FileSystem srcFs;
+ private void listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId,
+ boolean isInsertOverwrite, List<Path> newFiles) throws HiveException {
+ Path acidDir = new Path(loadPath, AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId));
try {
- srcFs = loadPath.getFileSystem(conf);
- srcs = srcFs.listStatus(loadPath);
+ FileSystem srcFs = loadPath.getFileSystem(conf);
+ if (srcFs.exists(acidDir) && srcFs.isDirectory(acidDir)){
+ // list out all the files in the path
+ listFilesInsideAcidDirectory(acidDir, srcFs, newFiles);
+ } else {
+ LOG.info("directory does not exist: " + acidDir);
+ return;
+ }
} catch (IOException e) {
LOG.error("Error listing files", e);
throw new HiveException(e);
}
- if (srcs == null) {
- LOG.info("No sources specified: " + loadPath);
- return newFiles;
- }
- PathFilter subdirFilter = null;
-
- // Note: just like the move path, we only do one level of recursion.
- for (FileStatus src : srcs) {
- if (src.isDirectory()) {
- if (subdirFilter == null) {
- subdirFilter = new PathFilter() {
- @Override
- public boolean accept(Path path) {
- return path.getName().startsWith(filePrefix);
- }
- };
- }
- try {
- for (FileStatus srcFile : srcFs.listStatus(src.getPath(), subdirFilter)) {
- newFiles.add(srcFile.getPath());
- }
- } catch (IOException e) {
- throw new HiveException(e);
- }
- } else if (src.getPath().getName().startsWith(filePrefix)) {
- newFiles.add(src.getPath());
- }
- }
- return newFiles;
+ return;
}
private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl,
@@ -2335,13 +2352,17 @@ private void constructOneLBLocationMap(FileStatus fSta,
boolean isSkewedStoreAsSubdir, boolean isAcidIUDoperation, boolean hasFollowingStatsTask,
Long writeId, int stmtId, boolean isInsertOverwrite) throws HiveException {
- List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ List<Path> newFiles = null;
Table tbl = getTable(tableName);
assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
+ if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
+ newFiles = Collections.synchronizedList(new ArrayList<Path>());
+ }
+
// Note: this assumes both paths are qualified; which they are, currently.
if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) {
/**
@@ -2353,7 +2374,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
Utilities.FILE_OP_LOGGER.debug(
"not moving " + loadPath + " to " + tbl.getPath() + " (MM)");
}
- newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
+
+ //new files list is required only for event notification.
+ if (newFiles != null) {
+ listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTable ? isInsertOverwrite : false, newFiles);
+ }
} else {
// Either a non-MM query, or a load into MM table from an external source.
Path tblPath = tbl.getPath();
@@ -2421,11 +2446,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
alterTable(tbl, environmentContext);
- if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
- fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
+ if (AcidUtils.isTransactionalTable(tbl)) {
+ addWriteNotificationLog(tbl, null, newFiles, writeId);
} else {
- fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), null);
-
+ fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
}
}
@@ -2677,6 +2701,48 @@ private void constructOneLBLocationMap(FileStatus fSta,
tpart.getSd().setLocation(partPath);
}
+ private void addWriteNotificationLog(Table tbl, Map<String, String> partitionSpec,
+ List<Path> newFiles, Long writeId) throws HiveException {
+ if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
+ LOG.debug("write notification log is ignored as dml event logging is disabled");
+ return;
+ }
+
+ if (tbl.isTemporary()) {
+ LOG.debug("write notification log is ignored as " + tbl.getTableName() + " is temporary : " + writeId);
+ return;
+ }
+
+ if (newFiles == null || newFiles.isEmpty()) {
+ LOG.debug("write notification log is ignored as file list is empty");
+ return;
+ }
+
+ LOG.debug("adding write notification log for operation " + writeId + " table " + tbl.getCompleteName() +
+ "partition " + partitionSpec + " list of files " + newFiles);
+
+ try {
+ FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
+ Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+
+ InsertEventRequestData insertData = new InsertEventRequestData();
+ insertData.setReplace(true);
+
+ WriteNotificationLogRequest rqst = new WriteNotificationLogRequest(txnId, writeId,
+ tbl.getDbName(), tbl.getTableName(), insertData);
+ addInsertFileInformation(newFiles, fileSystem, insertData);
+
+ if (partitionSpec != null && !partitionSpec.isEmpty()) {
+ for (FieldSchema fs : tbl.getPartitionKeys()) {
+ rqst.addToPartitionVals(partitionSpec.get(fs.getName()));
+ }
+ }
+ getSynchronizedMSC().addWriteNotificationLog(rqst);
+ } catch (IOException | TException e) {
+ throw new HiveException(e);
+ }
+ }
+
private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
throws HiveException {
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
@@ -2753,6 +2819,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
InsertEventRequestData insertData) throws IOException {
insertData.addToFilesAdded(p.toString());
FileChecksum cksum = fileSystem.getFileChecksum(p);
+ String acidDirPath = AcidUtils.getFirstLevelAcidDirPath(p.getParent(), fileSystem);
// File checksum is not implemented for local filesystem (RawLocalFileSystem)
if (cksum != null) {
String checksumString =
@@ -2762,6 +2829,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
// Add an empty checksum string for filesystems that don't generate one
insertData.addToFilesAddedChecksum("");
}
+
+ // acid dir will be present only for acid write operations.
+ if (acidDirPath != null) {
+ insertData.addToSubDirectoryList(acidDirPath);
+ }
}
public boolean dropPartition(String tblName, List<String> part_vals, boolean deleteData)
@@ -3718,7 +3790,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
@Override
public Void call() throws HiveException {
SessionState.setCurrentSessionState(parentSession);
- final String group = srcStatus.getGroup();
try {
boolean success = false;
if (destFs instanceof DistributedFileSystem) {
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
index f1c4d98..e04a0f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.metadata;
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -431,11 +432,19 @@ public final class HiveUtils {
public static String getReplPolicy(String dbName, String tableName) {
if ((dbName == null) || (dbName.isEmpty())) {
- return null;
+ return "*.*";
} else if ((tableName == null) || (tableName.isEmpty())) {
return dbName.toLowerCase() + ".*";
} else {
return dbName.toLowerCase() + "." + tableName.toLowerCase();
}
}
+
+ public static Path getDumpPath(Path root, String dbName, String tableName) {
+ assert (dbName != null);
+ if ((tableName != null) && (!tableName.isEmpty())) {
+ return new Path(root, dbName + "." + tableName);
+ }
+ return new Path(root, dbName);
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index d34de61..eb594f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
import org.apache.hadoop.hive.ql.plan.DDLWork;
import org.apache.hadoop.hive.ql.plan.DropTableDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -249,9 +250,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
throw new HiveException(e);
}
+ boolean inReplicationScope = false;
if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
tblDesc.setReplicationSpec(replicationSpec);
StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
+ inReplicationScope = true;
}
if (isExternalSet) {
@@ -275,7 +278,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
for (Partition partition : partitions) {
// TODO: this should ideally not create AddPartitionDesc per partition
AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
- if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
+ if (inReplicationScope){
StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE);
}
partitionDescs.add(partsDesc);
@@ -335,13 +338,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
//if importing into existing transactional table or will create a new transactional table
//(because Export was done from transactional table), need a writeId
// Explain plan doesn't open a txn and hence no need to allocate write id.
- if (x.getCtx().getExplainConfig() == null) {
+ // In replication flow, no need to allocate write id. It will be allocated using the alloc write id event.
+ if (x.getCtx().getExplainConfig() == null && !inReplicationScope) {
writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName());
stmtId = txnMgr.getStmtIdAndIncrement();
}
}
- if (!replicationSpec.isInReplicationScope()) {
+ if (!inReplicationScope) {
createRegularImportTasks(
tblDesc, partitionDescs,
isPartSpecSet, replicationSpec, table,
@@ -390,7 +394,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
Path destPath = null, loadPath = null;
LoadFileType lft;
- if (AcidUtils.isTransactionalTable(table)) {
+ if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) {
String mmSubdir = replace ? AcidUtils.baseDir(writeId)
: AcidUtils.deltaSubdir(writeId, writeId, stmtId);
destPath = new Path(tgtPath, mmSubdir);
@@ -428,13 +432,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
}
- LoadTableDesc loadTableWork = new LoadTableDesc(
- loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
- loadTableWork.setStmtId(stmtId);
+ MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);
+
+
+ if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) {
+ LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+ Collections.singletonList(destPath),
+ Collections.singletonList(tgtPath),
+ true, null, null);
+ moveWork.setMultiFilesDesc(loadFilesWork);
+ moveWork.setNeedCleanTarget(false);
+ } else {
+ LoadTableDesc loadTableWork = new LoadTableDesc(
+ loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
+ loadTableWork.setStmtId(stmtId);
+ moveWork.setLoadTableWork(loadTableWork);
+ }
+
//if Importing into existing table, FileFormat is checked by
// ImportSemanticAnalzyer.checked checkTable()
- MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
- Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
+ Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadTableTask);
x.getTasks().add(copyTask);
return loadTableTask;
@@ -498,8 +515,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
+ partSpecToString(partSpec.getPartSpec())
+ " with source location: " + srcLocation);
Path tgtLocation = new Path(partSpec.getLocation());
- Path destPath = !AcidUtils.isTransactionalTable(table.getParameters()) ?
- x.getCtx().getExternalTmpPath(tgtLocation)
+ //Replication scope the write id will be invalid
+ Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
+ replicationSpec.isInReplicationScope();
+ Path destPath = useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation)
: new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
Path moveTaskSrc = !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation;
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -523,17 +542,29 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
Task<?> addPartTask = TaskFactory.get(
new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
+ MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(),
+ null, null, false);
+
// Note: this sets LoadFileType incorrectly for ACID; is that relevant for import?
// See setLoadFileType and setIsAcidIow calls elsewhere for an example.
- LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
- partSpec.getPartSpec(),
- replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
- writeId);
- loadTableWork.setStmtId(stmtId);
- loadTableWork.setInheritTableSpecs(false);
- Task<?> loadPartTask = TaskFactory.get(
- new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false),
- x.getConf());
+ if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) {
+ LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+ Collections.singletonList(destPath),
+ Collections.singletonList(tgtLocation),
+ true, null, null);
+ moveWork.setMultiFilesDesc(loadFilesWork);
+ moveWork.setNeedCleanTarget(false);
+ } else {
+ LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
+ partSpec.getPartSpec(),
+ replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
+ writeId);
+ loadTableWork.setStmtId(stmtId);
+ loadTableWork.setInheritTableSpecs(false);
+ moveWork.setLoadTableWork(loadTableWork);
+ }
+
+ Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
copyTask.addDependentTask(loadPartTask);
addPartTask.addDependentTask(loadPartTask);
x.getTasks().add(copyTask);
@@ -1005,7 +1036,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
t.addDependentTask(
addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
if (updatedMetadata != null) {
- updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+ updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+ addPartitionDesc.getPartition(0).getPartSpec());
}
}
} else {
@@ -1057,13 +1089,15 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
x.getTasks().add(addSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
if (updatedMetadata != null) {
- updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+ updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+ addPartitionDesc.getPartition(0).getPartSpec());
}
} else {
x.getTasks().add(alterSinglePartition(
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x));
if (updatedMetadata != null) {
- updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+ updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+ addPartitionDesc.getPartition(0).getPartSpec());
}
}
} else {
@@ -1078,7 +1112,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
}
if (updatedMetadata != null) {
- updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+ updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+ addPartitionDesc.getPartition(0).getPartSpec());
}
if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
lockType = WriteEntity.WriteType.DDL_SHARED;
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index f37de3e..c5714a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -588,49 +588,55 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private List<Task<? extends Serializable>> addUpdateReplStateTasks(
boolean isDatabaseLoad,
- UpdatedMetaDataTracker updatedMetadata,
+ UpdatedMetaDataTracker updatedMetaDataTracker,
List<Task<? extends Serializable>> importTasks) throws SemanticException {
- String replState = updatedMetadata.getReplicationState();
- String dbName = updatedMetadata.getDatabase();
- String tableName = updatedMetadata.getTable();
-
- // If no import tasks generated by the event or no table updated for table level load, then no
- // need to update the repl state to any object.
- if (importTasks.isEmpty() || (!isDatabaseLoad && (tableName == null))) {
- LOG.debug("No objects need update of repl state: Either 0 import tasks or table level load");
+ // If no import tasks generated by the event then no need to update the repl state to any object.
+ if (importTasks.isEmpty()) {
+ LOG.debug("No objects need update of repl state: 0 import tasks");
return importTasks;
}
// Create a barrier task for dependency collection of import tasks
- Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork());
-
- // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
- for (Task<? extends Serializable> t : importTasks){
- t.addDependentTask(barrierTask);
- LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
- t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
- }
+ Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
List<Task<? extends Serializable>> tasks = new ArrayList<>();
Task<? extends Serializable> updateReplIdTask;
// If any partition is updated, then update repl state in partition object
- for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
- updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
- tasks.add(updateReplIdTask);
+ for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : updatedMetaDataTracker.getUpdateMetaDataList()) {
+ String replState = updateMetaData.getReplState();
+ String dbName = updateMetaData.getDbName();
+ String tableName = updateMetaData.getTableName();
+ // If any partition is updated, then update repl state in partition object
+ for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) {
+ updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ if (tableName != null) {
+ // If any table/partition is updated, then update repl state in table object
+ updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
+
+ // For table level load, need not update replication state for the database
+ if (isDatabaseLoad) {
+ // If any table/partition is updated, then update repl state in db object
+ updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
+ tasks.add(updateReplIdTask);
+ }
}
- if (tableName != null) {
- // If any table/partition is updated, then update repl state in table object
- updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
- tasks.add(updateReplIdTask);
+ if (tasks.isEmpty()) {
+ LOG.debug("No objects need update of repl state: 0 update tracker tasks");
+ return importTasks;
}
- // For table level load, need not update replication state for the database
- if (isDatabaseLoad) {
- // If any table/partition is updated, then update repl state in db object
- updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
- tasks.add(updateReplIdTask);
+ // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
+ for (Task<? extends Serializable> t : importTasks){
+ t.addDependentTask(barrierTask);
+ LOG.debug("Added {}:{} as a precursor of barrier task {}:{}",
+ t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
}
// At least one task would have been added to update the repl state
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index e10729e..26db245 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7316,7 +7316,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
try {
if (ctx.getExplainConfig() != null) {
- writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
+ writeId = null; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
} else {
if (isMmTable) {
writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName());
@@ -7331,6 +7331,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
dest_tab.getDbName(), dest_tab.getTableName());
ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, writeId);
+ if (writeId != null) {
+ ltd.setStmtId(txnMgr.getCurrentStmtId());
+ }
// For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
// deltas and base and leave them up to the cleaner to clean up
boolean isInsertInto = qb.getParseInfo().isInsertIntoTable(
@@ -7426,6 +7429,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
throw new SemanticException("Failed to allocate write Id", ex);
}
ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, writeId);
+ if (writeId != null) {
+ ltd.setStmtId(txnMgr.getCurrentStmtId());
+ }
// For the current context for generating File Sink Operator, it is either INSERT INTO or INSERT OVERWRITE.
// So the next line works.
boolean isInsertInto = !qb.getParseInfo().isDestToOpTypeInsertOverwrite(dest);
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index d9483f8..97eb4b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -179,9 +179,23 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
String newTableName = getTmptTableNameForExport(exportTable); //this is db.table
Map<String, String> tblProps = new HashMap<>();
tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString());
+ String location;
+
+ // for temporary tables we set the location to something in the session's scratch dir
+ // it has the same life cycle as the tmp table
+ try {
+ // Generate a unique ID for temp table path.
+ // This path will be fixed for the life of the temp table.
+ Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString());
+ path = Warehouse.getDnsPath(path, conf);
+ location = path.toString();
+ } catch (MetaException err) {
+ throw new SemanticException("Error while generating temp table path:", err);
+ }
+
CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName,
false, true, null,
- null, null, null, null,
+ null, location, null, null,
tblProps,
true, //important so we get an exception on name collision
Warehouse.getQualifiedName(exportTable.getTTable()), false);
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 7e8d520..9a54c1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -400,7 +400,7 @@ public class CopyUtils {
return result;
}
- private Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) {
+ public static Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) {
if (fileInfo.getSubDir() == null) {
return destRoot;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index c0701c5..62d699f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -186,10 +186,6 @@ public class Utils {
return false;
}
- boolean isAcidTable = AcidUtils.isTransactionalTable(tableHandle);
- if (isAcidTable) {
- return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES);
- }
return !tableHandle.isTemporary();
}
return true;
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index db97d7c..f04cd93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -18,9 +18,27 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.fs.FileSystem;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
class CommitTxnHandler extends AbstractEventHandler {
@@ -28,11 +46,116 @@ class CommitTxnHandler extends AbstractEventHandler {
super(event);
}
+ private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
+ Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+ FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+ return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+ }
+
+ private void writeDumpFiles(Context withinContext, Iterable<String> files, Path dataPath) throws IOException {
+ // encoded filename/checksum of files, write into _files
+ try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+ for (String file : files) {
+ fileListWriter.write(file + "\n");
+ }
+ }
+ }
+
+ private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
+ List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException {
+ if (fileListArray == null || fileListArray.isEmpty()) {
+ return;
+ }
+
+ Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+ withinContext.replicationSpec.setIsReplace(true);
+ EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
+ qlMdTable, qlPtns,
+ withinContext.replicationSpec,
+ withinContext.hiveConf);
+
+ if ((null == qlPtns) || qlPtns.isEmpty()) {
+ Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+ writeDumpFiles(withinContext, fileListArray.get(0), dataPath);
+ } else {
+ for (int idx = 0; idx < qlPtns.size(); idx++) {
+ Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName());
+ writeDumpFiles(withinContext, fileListArray.get(idx), dataPath);
+ }
+ }
+ }
+
+ private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
+ List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException {
+ Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName());
+ Context context = new Context(withinContext);
+ context.setEventRoot(newPath);
+ createDumpFile(context, qlMdTable, qlPtns, fileListArray);
+ }
+
@Override
public void handle(Context withinContext) throws Exception {
LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), event.getMessage());
+ String payload = event.getMessage();
+
+ if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+ CommitTxnMessage commitTxnMessage = deserializer.getCommitTxnMessage(event.getMessage());
+
+ String contextDbName = withinContext.dbName == null ? null :
+ StringUtils.normalizeIdentifier(withinContext.dbName);
+ String contextTableName = withinContext.tableName == null ? null :
+ StringUtils.normalizeIdentifier(withinContext.tableName);
+ List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
+ getAllWriteEventInfo(commitTxnMessage.getTxnId(), contextDbName, contextTableName);
+ int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
+ if (numEntry != 0) {
+ commitTxnMessage.addWriteEventInfo(writeEventInfoList);
+ payload = commitTxnMessage.toString();
+ LOG.debug("payload for commit txn event : " + payload);
+ }
+
+ org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null;
+ org.apache.hadoop.hive.ql.metadata.Table qlMdTable = null;
+ List<Partition> qlPtns = new ArrayList<>();
+ List<List<String>> filesTobeAdded = new ArrayList<>();
+
+ // The below loop creates dump directory for each table. It reads through the list of write notification events,
+ // groups the entries per table and creates the lists of files to be replicated. The event directory in the dump
+ // path will have subdirectory for each table. This folder will have metadata for the table and the list of files
+ // to be replicated. The entries are added in the table with txn id, db name,table name, partition name
+ // combination as primary key, so the entries with same table will come together. Only basic table metadata is
+ // used during import, so we need not dump the latest table metadata.
+ for (int idx = 0; idx < numEntry; idx++) {
+ qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(commitTxnMessage.getTableObj(idx));
+ if (qlMdTablePrev == null) {
+ qlMdTablePrev = qlMdTable;
+ }
+
+ // one dump directory per table
+ if (!qlMdTablePrev.getCompleteName().equals(qlMdTable.getCompleteName())) {
+ createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded);
+ qlPtns = new ArrayList<>();
+ filesTobeAdded = new ArrayList<>();
+ qlMdTablePrev = qlMdTable;
+ }
+
+ if (qlMdTable.isPartitioned() && (null != commitTxnMessage.getPartitionObj(idx))) {
+ qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable,
+ commitTxnMessage.getPartitionObj(idx)));
+ }
+
+ filesTobeAdded.add(Lists.newArrayList(
+ ReplChangeManager.getListFromSeparatedString(commitTxnMessage.getFiles(idx))));
+ }
+
+ //Dump last table in the list
+ if (qlMdTablePrev != null) {
+ createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded);
+ }
+ }
+
DumpMetaData dmd = withinContext.createDmd(this);
- dmd.setPayload(event.getMessage());
+ dmd.setPayload(payload);
dmd.write();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
index c0fa7b2..ec35f4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -35,18 +35,37 @@ public interface EventHandler {
DumpType dumpType();
class Context {
- final Path eventRoot, cmRoot;
+ Path eventRoot;
+ final Path cmRoot;
final Hive db;
final HiveConf hiveConf;
final ReplicationSpec replicationSpec;
+ final String dbName;
+ final String tableName;
public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
- ReplicationSpec replicationSpec) {
+ ReplicationSpec replicationSpec, String dbName, String tableName) {
this.eventRoot = eventRoot;
this.cmRoot = cmRoot;
this.db = db;
this.hiveConf = hiveConf;
this.replicationSpec = replicationSpec;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ }
+
+ public Context(Context other) {
+ this.eventRoot = other.eventRoot;
+ this.cmRoot = other.cmRoot;
+ this.db = other.db;
+ this.hiveConf = other.hiveConf;
+ this.replicationSpec = other.replicationSpec;
+ this.dbName = other.dbName;
+ this.tableName = other.tableName;
+ }
+
+ public void setEventRoot(Path eventRoot) {
+ this.eventRoot = eventRoot;
}
DumpMetaData createDmd(EventHandler eventHandler) {
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 5ac3af0..cf3822a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.parse.EximUtil;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -53,6 +54,9 @@ class InsertHandler extends AbstractEventHandler {
return;
}
+ // In case of ACID tables, insert event should not have fired.
+ assert(!AcidUtils.isTransactionalTable(qlMdTable));
+
List<Partition> qlPtns = null;
if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) {
qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg));
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
index d76f419..614e071 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
@@ -17,7 +17,10 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.load;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hive.common.util.HiveStringUtils;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Map;
import java.util.List;
@@ -25,52 +28,113 @@ import java.util.List;
* Utility class to help track and return the metadata which are updated by repl load
*/
public class UpdatedMetaDataTracker {
- private String replState;
- private String dbName;
- private String tableName;
- private List<Map <String, String>> partitionsList;
- public UpdatedMetaDataTracker() {
- this.replState = null;
- this.dbName = null;
- this.tableName = null;
- this.partitionsList = new ArrayList<>();
+ /**
+ * Utility class to store replication state of a table.
+ */
+ public static class UpdateMetaData {
+ private String replState;
+ private String dbName;
+ private String tableName;
+ private List<Map <String, String>> partitionsList;
+
+ UpdateMetaData(String replState, String dbName, String tableName, Map <String, String> partSpec) {
+ this.replState = replState;
+ this.dbName = dbName;
+ this.tableName = tableName;
+ this.partitionsList = new ArrayList<>();
+ if (partSpec != null) {
+ this.partitionsList.add(partSpec);
+ }
+ }
+
+ public String getReplState() {
+ return replState;
+ }
+
+ public String getDbName() {
+ return dbName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public List<Map <String, String>> getPartitionsList() {
+ return partitionsList;
+ }
+
+ public void addPartition(Map<String, String> partSpec) {
+ this.partitionsList.add(partSpec);
+ }
}
- public void copyUpdatedMetadata(UpdatedMetaDataTracker other) {
- this.replState = other.replState;
- this.dbName = other.dbName;
- this.tableName = other.tableName;
- this.partitionsList = other.getPartitions();
+ private List<UpdateMetaData> updateMetaDataList;
+ private Map<String, Integer> updateMetaDataMap;
+
+ public UpdatedMetaDataTracker() {
+ updateMetaDataList = new ArrayList<>();
+ updateMetaDataMap = new HashMap<>();
}
- public void set(String replState, String dbName, String tableName, Map <String, String> partSpec) {
- this.replState = replState;
- this.dbName = dbName;
- this.tableName = tableName;
- if (partSpec != null) {
- addPartition(partSpec);
+ public void copyUpdatedMetadata(UpdatedMetaDataTracker other) {
+ int size = updateMetaDataList.size();
+ for (UpdateMetaData updateMetaDataOther : other.updateMetaDataList) {
+ String key = getKey(normalizeIdentifier(updateMetaDataOther.getDbName()),
+ normalizeIdentifier(updateMetaDataOther.getTableName()));
+ Integer idx = updateMetaDataMap.get(key);
+ if (idx == null) {
+ updateMetaDataList.add(updateMetaDataOther);
+ updateMetaDataMap.put(key, size++);
+ } else if (updateMetaDataOther.partitionsList != null && updateMetaDataOther.partitionsList.size() != 0) {
+ UpdateMetaData updateMetaData = updateMetaDataList.get(idx);
+ for (Map<String, String> partSpec : updateMetaDataOther.partitionsList) {
+ updateMetaData.addPartition(partSpec);
+ }
+ }
}
}
- public void addPartition(Map <String, String> partSpec) {
- partitionsList.add(partSpec);
+ public void set(String replState, String dbName, String tableName, Map <String, String> partSpec)
+ throws SemanticException {
+ if (dbName == null) {
+ throw new SemanticException("db name can not be null");
+ }
+ String key = getKey(normalizeIdentifier(dbName), normalizeIdentifier(tableName));
+ Integer idx = updateMetaDataMap.get(key);
+ if (idx == null) {
+ updateMetaDataList.add(new UpdateMetaData(replState, dbName, tableName, partSpec));
+ updateMetaDataMap.put(key, updateMetaDataList.size() - 1);
+ } else {
+ updateMetaDataList.get(idx).addPartition(partSpec);
+ }
}
- public String getReplicationState() {
- return replState;
+ public void addPartition(String dbName, String tableName, Map <String, String> partSpec) throws SemanticException {
+ if (dbName == null) {
+ throw new SemanticException("db name can not be null");
+ }
+ String key = getKey(normalizeIdentifier(dbName), normalizeIdentifier(tableName));
+ Integer idx = updateMetaDataMap.get(key);
+ if (idx == null) {
+ throw new SemanticException("add partition to metadata map failed as list is not yet set for table : " + key);
+ }
+ updateMetaDataList.get(idx).addPartition(partSpec);
}
- public String getDatabase() {
- return dbName;
+ public List<UpdateMetaData> getUpdateMetaDataList() {
+ return updateMetaDataList;
}
- public String getTable() {
- return tableName;
+ private String getKey(String dbName, String tableName) {
+ if (tableName == null) {
+ return dbName + ".*";
+ }
+ return dbName + "." + tableName;
}
- public List<Map <String, String>> getPartitions() {
- return partitionsList;
+ private String normalizeIdentifier(String name) {
+ return name == null ? null : HiveStringUtils.normalizeIdentifier(name);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
index afc7426..d3f3306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -48,7 +48,12 @@ public class AbortTxnHandler extends AbstractMessageHandler {
msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec()),
context.hiveConf
);
- updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+
+ // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database.
+ // Anyways, if this event gets executed again, it is taken care of.
+ if (!context.isDbNameEmpty()) {
+ updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+ }
context.log.debug("Added Abort txn task : {}", abortTxnTask.getId());
return Collections.singletonList(abortTxnTask);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
index 9bdbf64..63f2577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
@@ -52,7 +52,7 @@ public class AllocWriteIdHandler extends AbstractMessageHandler {
.getTableName());
// Repl policy should be created based on the table name in context.
- ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(dbName, context.tableName), dbName, tableName,
+ ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), dbName, tableName,
ReplTxnWork.OperationType.REPL_ALLOC_WRITE_ID, msg.getTxnToWriteIdList(), context.eventOnlyReplicationSpec());
Task<? extends Serializable> allocWriteIdTask = TaskFactory.get(work, context.hiveConf);
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index d25102e..87a6ff6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -17,7 +17,12 @@
*/
package org.apache.hadoop.hive.ql.parse.repl.load.message;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -25,7 +30,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import java.io.Serializable;
-import java.util.Collections;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -35,20 +40,75 @@ import java.util.List;
public class CommitTxnHandler extends AbstractMessageHandler {
@Override
public List<Task<? extends Serializable>> handle(Context context)
- throws SemanticException {
+ throws SemanticException {
if (!AcidUtils.isAcidEnabled(context.hiveConf)) {
context.log.error("Cannot load transaction events as acid is not enabled");
throw new SemanticException("Cannot load transaction events as acid is not enabled");
}
CommitTxnMessage msg = deserializer.getCommitTxnMessage(context.dmd.getPayload());
- Task<ReplTxnWork> commitTxnTask = TaskFactory.get(
- new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
- msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()),
- context.hiveConf
- );
- updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+ int numEntry = (msg.getTables() == null ? 0 : msg.getTables().size());
+ List<Task<? extends Serializable>> tasks = new ArrayList<>();
+ String dbName = context.dbName;
+ String tableNamePrev = null;
+ String tblName = context.tableName;
+
+ ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName,
+ context.tableName, msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec());
+
+ if (numEntry > 0) {
+ context.log.debug("Commit txn handler for txnid " + msg.getTxnId() + " databases : " + msg.getDatabases() +
+ " tables : " + msg.getTables() + " partitions : " + msg.getPartitions() + " files : " +
+ msg.getFilesList() + " write ids : " + msg.getWriteIds());
+ }
+
+ for (int idx = 0; idx < numEntry; idx++) {
+ String actualTblName = msg.getTables().get(idx);
+ String actualDBName = msg.getDatabases().get(idx);
+ String completeName = Table.getCompleteName(actualDBName, actualTblName);
+
+ // One import task per table. Events for same table are kept together in one dump directory during dump and are
+ // grouped together in commit txn message.
+ if (tableNamePrev == null || !(completeName.equals(tableNamePrev))) {
+ // The data location is created by source, so the location should be formed based on the table name in msg.
+ Path location = HiveUtils.getDumpPath(new Path(context.location), actualDBName, actualTblName);
+ tblName = context.isTableNameEmpty() ? actualTblName : context.tableName;
+ // for warehouse level dump, use db name from write event
+ dbName = (context.isDbNameEmpty() ? actualDBName : context.dbName);
+ Context currentContext = new Context(context, dbName, tblName);
+ currentContext.setLocation(location.toUri().toString());
+
+ // Piggybacking in Import logic for now
+ TableHandler tableHandler = new TableHandler();
+ tasks.addAll((tableHandler.handle(currentContext)));
+ readEntitySet.addAll(tableHandler.readEntities());
+ writeEntitySet.addAll(tableHandler.writeEntities());
+ getUpdatedMetadata().copyUpdatedMetadata(tableHandler.getUpdatedMetadata());
+ tableNamePrev = completeName;
+ }
+
+ try {
+ WriteEventInfo writeEventInfo = new WriteEventInfo(msg.getWriteIds().get(idx),
+ dbName, tblName, msg.getFiles(idx));
+ if (msg.getPartitions().get(idx) != null && !msg.getPartitions().get(idx).isEmpty()) {
+ writeEventInfo.setPartition(msg.getPartitions().get(idx));
+ }
+ work.addWriteEventInfo(writeEventInfo);
+ } catch (Exception e) {
+ throw new SemanticException("Failed to extract write event info from commit txn message : " + e.getMessage());
+ }
+ }
+
+ Task<ReplTxnWork> commitTxnTask = TaskFactory.get(work, context.hiveConf);
+
+ // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database.
+ // Anyways, if this event gets executed again, it is taken care of.
+ if (!context.isDbNameEmpty()) {
+ updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+ }
context.log.debug("Added Commit txn task : {}", commitTxnTask.getId());
- return Collections.singletonList(commitTxnTask);
+ DAGTraversal.traverse(tasks, new AddDependencyToLeaves(commitTxnTask));
+ return tasks;
}
}
+
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
index ef4a901..cdf51dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -46,8 +46,8 @@ public interface MessageHandler {
UpdatedMetaDataTracker getUpdatedMetadata();
class Context {
- public String dbName;
- public final String tableName, location;
+ public String location;
+ public final String tableName, dbName;
public final Task<? extends Serializable> precursor;
public DumpMetaData dmd;
final HiveConf hiveConf;
@@ -101,5 +101,9 @@ public interface MessageHandler {
public HiveTxnManager getTxnMgr() {
return nestedContext.getHiveTxnManager();
}
+
+ public void setLocation(String location) {
+ this.location = location;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
index 190e021..5dcc44e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
@@ -47,7 +47,12 @@ public class OpenTxnHandler extends AbstractMessageHandler {
msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN, context.eventOnlyReplicationSpec()),
context.hiveConf
);
- updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+
+ // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database.
+ // Anyways, if this event gets executed again, it is taken care of.
+ if (!context.isDbNameEmpty()) {
+ updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+ }
context.log.debug("Added Open txn task : {}", openTxnTask.getId());
return Collections.singletonList(openTxnTask);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 9a1e3a1..47a56d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -40,6 +40,7 @@ public class MoveWork implements Serializable {
private LoadMultiFilesDesc loadMultiFilesWork;
private boolean checkFileFormat;
private boolean srcLocal;
+ private boolean needCleanTarget;
/**
* ReadEntitites that are passed to the hooks.
@@ -63,6 +64,7 @@ public class MoveWork implements Serializable {
private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
this.inputs = inputs;
this.outputs = outputs;
+ this.needCleanTarget = true;
}
public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
@@ -93,6 +95,7 @@ public class MoveWork implements Serializable {
srcLocal = o.isSrcLocal();
inputs = o.getInputs();
outputs = o.getOutputs();
+ needCleanTarget = o.needCleanTarget;
}
@Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -153,5 +156,12 @@ public class MoveWork implements Serializable {
public void setSrcLocal(boolean srcLocal) {
this.srcLocal = srcLocal;
}
-
+
+ public boolean isNeedCleanTarget() {
+ return needCleanTarget;
+ }
+
+ public void setNeedCleanTarget(boolean needCleanTarget) {
+ this.needCleanTarget = needCleanTarget;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
index 3c853c9..a6ab836 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.plan;
import java.io.Serializable;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -40,6 +42,7 @@ public class ReplTxnWork implements Serializable {
private List<Long> txnIds;
private List<TxnToWriteId> txnToWriteIdList;
private ReplicationSpec replicationSpec;
+ private List<WriteEventInfo> writeEventInfos;
/**
* OperationType.
@@ -60,6 +63,7 @@ public class ReplTxnWork implements Serializable {
this.replPolicy = replPolicy;
this.txnToWriteIdList = txnToWriteIdList;
this.replicationSpec = replicationSpec;
+ this.writeEventInfos = null;
}
public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
@@ -86,6 +90,13 @@ public class ReplTxnWork implements Serializable {
this.operation = type;
}
+ public void addWriteEventInfo(WriteEventInfo writeEventInfo) {
+ if (this.writeEventInfos == null) {
+ this.writeEventInfos = new ArrayList<>();
+ }
+ this.writeEventInfos.add(writeEventInfo);
+ }
+
public List<Long> getTxnIds() {
return txnIds;
}
@@ -121,4 +132,8 @@ public class ReplTxnWork implements Serializable {
public ReplicationSpec getReplicationSpec() {
return replicationSpec;
}
+
+ public List<WriteEventInfo> getWriteEventInfos() {
+ return writeEventInfos;
+ }
}