You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dj...@apache.org on 2018/05/08 18:42:18 UTC

[13/58] [abbrv] hive git commit: HIVE-18988: Support bootstrap replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
index 7edfc6a..fb8c4ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/HiveWrapper.java
@@ -36,9 +36,13 @@ public class HiveWrapper {
   private final Tuple.Function<ReplicationSpec> functionForSpec;
 
   public HiveWrapper(Hive db, String dbName) {
+    this(db, dbName, 0);
+  }
+
+  public HiveWrapper(Hive db, String dbName, long lastReplId) {
     this.dbName = dbName;
     this.db = db;
-    this.functionForSpec = new BootStrapReplicationSpecFunction(db);
+    this.functionForSpec = new BootStrapReplicationSpecFunction(db, lastReplId);
   }
 
   public Tuple<org.apache.hadoop.hive.metastore.api.Function> function(final String name)

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 5844f3d..c960d2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
@@ -97,11 +98,12 @@ class PartitionExport {
         String partitionName = partition.getName();
         String threadName = Thread.currentThread().getName();
         LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName);
-        Path fromPath = partition.getDataLocation();
         try {
           // this the data copy
+          List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(),
+                  forReplicationSpec, hiveConf);
           Path rootDataDumpDir = paths.partitionExportDir(partitionName);
-          new FileOperations(fromPath, rootDataDumpDir, distCpDoAsUser, hiveConf)
+          new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf)
                   .export(forReplicationSpec);
           LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index abb2e88..d0aeee5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -45,6 +44,7 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -149,9 +149,11 @@ public class TableExport {
         }
         new PartitionExport(paths, partitions, distCpDoAsUser, conf).write(replicationSpec);
       } else {
-        Path fromPath = tableSpec.tableHandle.getDataLocation();
+        List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
+                replicationSpec, conf);
+
         // this is the data copy
-        new FileOperations(fromPath, paths.dataExportDir(), distCpDoAsUser, conf)
+        new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf)
             .export(replicationSpec);
       }
     } catch (Exception e) {
@@ -160,12 +162,6 @@ public class TableExport {
   }
 
   private boolean shouldExport() {
-    // Note: this is a temporary setting that is needed because replication does not support
-    //       ACID or MM tables at the moment. It will eventually be removed.
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES)
-        && AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
-      return true;
-    }
     return Utils.shouldReplicate(replicationSpec, tableSpec.tableHandle, conf);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index d2bdde9..14572ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -198,4 +199,13 @@ public class Utils {
     }
     return shouldReplicate(replicationSpec, table, hiveConf);
   }
+
+  static List<Path> getDataPathList(Path fromPath, ReplicationSpec replicationSpec, HiveConf conf)
+          throws IOException {
+    if (replicationSpec.isTransactionalTableDump()) {
+      return AcidUtils.getValidDataPaths(fromPath, conf, replicationSpec.getValidWriteIdList());
+    } else {
+      return Collections.singletonList(fromPath);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index 866d351..690498f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -39,19 +40,23 @@ import java.util.List;
 
 public class FileOperations {
   private static Logger logger = LoggerFactory.getLogger(FileOperations.class);
-  private final Path dataFileListPath;
+  private final List<Path> dataPathList;
   private final Path exportRootDataDir;
   private final String distCpDoAsUser;
   private HiveConf hiveConf;
   private final FileSystem dataFileSystem, exportFileSystem;
 
-  public FileOperations(Path dataFileListPath, Path exportRootDataDir,
+  public FileOperations(List<Path> dataPathList, Path exportRootDataDir,
                         String distCpDoAsUser, HiveConf hiveConf) throws IOException {
-    this.dataFileListPath = dataFileListPath;
+    this.dataPathList = dataPathList;
     this.exportRootDataDir = exportRootDataDir;
     this.distCpDoAsUser = distCpDoAsUser;
     this.hiveConf = hiveConf;
-    dataFileSystem = dataFileListPath.getFileSystem(hiveConf);
+    if ((dataPathList != null) && !dataPathList.isEmpty()) {
+      dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+    } else {
+      dataFileSystem = null;
+    }
     exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
   }
 
@@ -67,13 +72,15 @@ public class FileOperations {
    * This writes the actual data in the exportRootDataDir from the source.
    */
   private void copyFiles() throws IOException, LoginException {
-    FileStatus[] fileStatuses =
-        LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath);
-    List<Path> srcPaths = new ArrayList<>();
-    for (FileStatus fileStatus : fileStatuses) {
-      srcPaths.add(fileStatus.getPath());
+    for (Path dataPath : dataPathList) {
+      FileStatus[] fileStatuses =
+              LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataPath);
+      List<Path> srcPaths = new ArrayList<>();
+      for (FileStatus fileStatus : fileStatuses) {
+        srcPaths.add(fileStatus.getPath());
+      }
+      new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths);
     }
-    new CopyUtils(distCpDoAsUser, hiveConf).doCopy(exportRootDataDir, srcPaths);
   }
 
   /**
@@ -83,31 +90,57 @@ public class FileOperations {
    */
   private void exportFilesAsList() throws SemanticException, IOException {
     try (BufferedWriter writer = writer()) {
-      FileStatus[] fileStatuses =
-          LoadSemanticAnalyzer.matchFilesOrDir(dataFileSystem, dataFileListPath);
-      for (FileStatus fileStatus : fileStatuses) {
-        writer.write(encodedUri(fileStatus));
+      for (Path dataPath : dataPathList) {
+        writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+      }
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        // Write files inside the sub-directory.
+        Path subDir = fileStatus.getPath();
+        writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir));
+      } else {
+        writer.write(encodedUri(fileStatus, encodedSubDirs));
         writer.newLine();
       }
     }
   }
 
+  private FileStatus[] listFilesInDir(Path path) throws IOException {
+    return dataFileSystem.listStatus(path, p -> {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    });
+  }
+
   private BufferedWriter writer() throws IOException {
     Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME);
     if (exportFileSystem.exists(exportToFile)) {
       throw new IllegalArgumentException(
           exportToFile.toString() + " already exists and cant export data from path(dir) "
-              + dataFileListPath);
+              + dataPathList);
     }
-    logger.debug("exporting data files in dir : " + dataFileListPath + " to " + exportToFile);
+    logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile);
     return new BufferedWriter(
         new OutputStreamWriter(exportFileSystem.create(exportToFile))
     );
   }
 
-  private String encodedUri(FileStatus fileStatus) throws IOException {
+  private String encodedSubDir(String encodedParentDirs, Path subDir) {
+    if (null == encodedParentDirs) {
+      return subDir.getName();
+    } else {
+      return encodedParentDirs + Path.SEPARATOR + subDir.getName();
+    }
+  }
+
+  private String encodedUri(FileStatus fileStatus, String encodedSubDir) throws IOException {
     Path currentDataFilePath = fileStatus.getPath();
     String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);
-    return ReplChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum);
+    return ReplChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
index f72f430..b68e887 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FunctionSerializer.java
@@ -56,7 +56,7 @@ public class FunctionSerializer implements JsonWriter.Serializer {
         FileSystem fileSystem = inputPath.getFileSystem(hiveConf);
         Path qualifiedUri = PathBuilder.fullyQualifiedHDFSUri(inputPath, fileSystem);
         String checkSum = ReplChangeManager.checksumFor(qualifiedUri, fileSystem);
-        String newFileUri = ReplChangeManager.encodeFileUri(qualifiedUri.toString(), checkSum);
+        String newFileUri = ReplChangeManager.encodeFileUri(qualifiedUri.toString(), checkSum, null);
         resourceUris.add(new ResourceUri(uri.getResourceType(), newFileUri));
       } else {
         resourceUris.add(uri);

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
index 3c28f84..afc7426 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
 import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
-import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
index ef51396..9bdbf64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
@@ -18,12 +18,12 @@
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
 import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
-import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index 1274213..d25102e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
-import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
index a502117..190e021 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
 import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
-import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index b137cd9..ef7325f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -348,4 +348,15 @@ public class ImportTableDesc {
     }
     return TableType.MANAGED_TABLE;
   }
+
+  public Table toTable(HiveConf conf) throws Exception {
+    switch (getDescType()) {
+      case TABLE:
+        return createTblDesc.toTable(conf);
+      case VIEW:
+        return createViewDesc.toTable(conf);
+      default:
+        return null;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
new file mode 100644
index 0000000..3c853c9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.plan;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * ReplTxnTask.
+ * Used for replaying the transaction related events.
+ */
+@Explain(displayName = "Replication Transaction", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class ReplTxnWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private String replPolicy;
+  private String dbName;
+  private String tableName;
+  private List<String> partNames;
+  private String validWriteIdList;
+  private List<Long> txnIds;
+  private List<TxnToWriteId> txnToWriteIdList;
+  private ReplicationSpec replicationSpec;
+
+  /**
+   * OperationType.
+   * Different kind of events supported for replaying.
+   */
+  public enum OperationType {
+    REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID, REPL_WRITEID_STATE
+  }
+
+  OperationType operation;
+
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
+                     List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
+    this.txnIds = txnIds;
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.operation = type;
+    this.replPolicy = replPolicy;
+    this.txnToWriteIdList = txnToWriteIdList;
+    this.replicationSpec = replicationSpec;
+  }
+
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
+                     ReplicationSpec replicationSpec) {
+    this(replPolicy, dbName, tableName, txnIds, type, null, replicationSpec);
+  }
+
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, Long txnId,
+                     OperationType type, ReplicationSpec replicationSpec) {
+    this(replPolicy, dbName, tableName, Collections.singletonList(txnId), type, null, replicationSpec);
+  }
+
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, OperationType type,
+                     List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
+    this(replPolicy, dbName, tableName, null, type, txnToWriteIdList, replicationSpec);
+  }
+
+  public ReplTxnWork(String dbName, String tableName, List<String> partNames,
+                     String validWriteIdList, OperationType type) {
+    this.dbName = dbName;
+    this.tableName = tableName;
+    this.partNames = partNames;
+    this.validWriteIdList = validWriteIdList;
+    this.operation = type;
+  }
+
+  public List<Long> getTxnIds() {
+    return txnIds;
+  }
+
+  public String getDbName() {
+    return dbName;
+  }
+
+  public String getTableName()  {
+    return ((tableName == null) || tableName.isEmpty()) ? null : tableName;
+  }
+
+  public String getReplPolicy() {
+    return replPolicy;
+  }
+
+  public List<String> getPartNames() {
+    return partNames;
+  }
+
+  public String getValidWriteIdList() {
+    return validWriteIdList;
+  }
+
+  public OperationType getOperationType() {
+    return operation;
+  }
+
+  public List<TxnToWriteId> getTxnToWriteIdList() {
+    return txnToWriteIdList;
+  }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
+}