You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/04/07 00:16:51 UTC

falcon git commit: FALCON-1880 To support TDE encryption, add --skipcrccheck to distcp options for HiveDR.

Repository: falcon
Updated Branches:
  refs/heads/master fa66c17e9 -> c404da28c


FALCON-1880 To support TDE encryption, add --skipcrccheck to distcp options for HiveDR.

Add --skipcrccheck to distcp options for HiveDR.

Author: bvellanki <bv...@hortonworks.com>

Reviewers: "Peeyush Bishnoi <pe...@apache.org>,  Ajay Yadava <aj...@gmail.com>, Venkatesan Ramachandran <Me...@gmail.com>"

Closes #82 from bvellanki/master


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/c404da28
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/c404da28
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/c404da28

Branch: refs/heads/master
Commit: c404da28cc5eeefa158ea5a2aca38b4b2cf731ee
Parents: fa66c17
Author: bvellanki <bv...@hortonworks.com>
Authored: Wed Apr 6 15:16:36 2016 -0700
Committer: bvellanki <bv...@hortonworks.com>
Committed: Wed Apr 6 15:16:36 2016 -0700

----------------------------------------------------------------------
 addons/hivedr/pom.xml                           |  6 +++
 .../java/org/apache/falcon/hive/HiveDRArgs.java |  3 ++
 .../org/apache/falcon/hive/HiveDROptions.java   |  7 ++-
 .../java/org/apache/falcon/hive/HiveDRTool.java |  4 +-
 .../org/apache/falcon/hive/util/EventUtils.java | 51 ++++++++++----------
 .../hive-disaster-recovery-secure-workflow.xml  |  6 +++
 .../hive-disaster-recovery-secure.properties    |  2 +
 .../recipe/HiveReplicationRecipeTool.java       |  4 ++
 .../HiveReplicationRecipeToolOptions.java       |  1 +
 docs/src/site/twiki/HiveDR.twiki                |  6 +++
 10 files changed, 61 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/pom.xml
----------------------------------------------------------------------
diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml
index 37dc5c9..adf0459 100644
--- a/addons/hivedr/pom.xml
+++ b/addons/hivedr/pom.xml
@@ -172,6 +172,12 @@
                     <artifactId>hadoop-distcp</artifactId>
                     <scope>compile</scope>
                 </dependency>
+
+                <dependency>
+                    <groupId>org.apache.derby</groupId>
+                    <artifactId>derby</artifactId>
+                    <version>${derby.version}</version>
+                </dependency>
             </dependencies>
         </profile>
     </profiles>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
index 5490232..c9ad47e 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java
@@ -64,6 +64,9 @@ public enum HiveDRArgs {
     REPLICATION_MAX_MAPS("replicationMaxMaps", "number of maps", false),
     DISTCP_MAX_MAPS("distcpMaxMaps", "number of maps", false),
 
+    // Set to true if TDE is enabled
+    TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false),
+
     // Map Bandwidth
     DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false),
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
index 28515e4..868ec8d 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java
@@ -120,7 +120,7 @@ public class HiveDROptions {
     }
 
     public int getMaxEvents() {
-        return Integer.valueOf(context.get(HiveDRArgs.MAX_EVENTS));
+        return Integer.parseInt(context.get(HiveDRArgs.MAX_EVENTS));
     }
 
     public boolean shouldKeepHistory() {
@@ -147,6 +147,11 @@ public class HiveDROptions {
         return context.get(HiveDRArgs.EXECUTION_STAGE);
     }
 
+    public boolean isTDEEncryptionEnabled() {
+        return StringUtils.isEmpty(context.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED))
+                ? false : Boolean.valueOf(context.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED));
+    }
+
     public boolean shouldBlock() {
         return true;
     }

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
index e141800..17eec22 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java
@@ -271,8 +271,8 @@ public class HiveDRTool extends Configured implements Tool {
     private String sourceEvents() throws Exception {
         MetaStoreEventSourcer defaultSourcer = null;
         String inputFilename = null;
-        String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH +File.separator+inputOptions.getJobName()+"/"
-                +inputOptions.getJobName()+".id";
+        String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH + File.separator
+                + inputOptions.getJobName() + File.separator + inputOptions.getJobName() + ".id";
         Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile));
         try {
             HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
----------------------------------------------------------------------
diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
index d075bfb..3b088f7 100644
--- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
+++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java
@@ -67,10 +67,12 @@ public class EventUtils {
     private String jobNN = null;
     private String jobNNKerberosPrincipal = null;
     private String targetHiveServer2Uri = null;
+    private String sourceStagingPath = null;
     private String targetStagingPath = null;
     private String targetNN = null;
     private String targetNNKerberosPrincipal = null;
-    private String fullyQualifiedTargetStagingPath = null;
+    private String sourceStagingUri = null;
+    private String targetStagingUri = null;
     private List<Path> sourceCleanUpList = null;
     private List<Path> targetCleanUpList = null;
     private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class);
@@ -93,6 +95,8 @@ public class EventUtils {
         sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName());
         sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName());
         sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName());
+        sourceStagingPath = conf.get(HiveDRArgs.SOURCE_STAGING_PATH.getName())
+                + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName());
         jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName());
         jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName());
         targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName());
@@ -139,7 +143,8 @@ public class EventUtils {
 
     public void initializeFS() throws IOException {
         LOG.info("Initializing staging directory");
-        fullyQualifiedTargetStagingPath = new Path(targetNN, targetStagingPath).toString();
+        sourceStagingUri = new Path(sourceNN, sourceStagingPath).toString();
+        targetStagingUri = new Path(targetNN, targetStagingPath).toString();
         sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal));
         jobFileSystem = FileSystem.get(FileUtils.getConfiguration(jobNN, jobNNKerberosPrincipal));
         targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal));
@@ -177,7 +182,7 @@ public class EventUtils {
                 LOG.info("Process the export statements for db {} table {}", dbName, tableName);
                 processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false);
                 if (!sourceCleanUpList.isEmpty()) {
-                    invokeCopy(sourceCleanUpList);
+                    invokeCopy();
                 }
             }
         } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName())
@@ -310,11 +315,11 @@ public class EventUtils {
         }
     }
 
-    public void invokeCopy(List<Path> srcStagingPaths) throws Exception {
-        DistCpOptions options = getDistCpOptions(srcStagingPaths);
+    public void invokeCopy() throws Exception {
+        DistCpOptions options = getDistCpOptions();
         DistCp distCp = new DistCp(conf, options);
-        LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()),
-                fullyQualifiedTargetStagingPath);
+        LOG.info("Started DistCp with source Path: {} \ttarget path: {}", sourceStagingUri, targetStagingUri);
+
         Job distcpJob = distCp.execute();
         LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString());
         LOG.info("Completed DistCp");
@@ -323,27 +328,21 @@ public class EventUtils {
         }
     }
 
-    public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) {
-        /*
-         * Add the fully qualified sourceNameNode to srcStagingPath uris. This will
-         * ensure DistCp will succeed when the job is run on target cluster.
-         */
-        List<Path> fullyQualifiedSrcStagingPaths = new ArrayList<Path>();
-        for (Path srcPath : srcStagingPaths) {
-            fullyQualifiedSrcStagingPaths.add(new Path(sourceNN, srcPath.toString()));
+    public DistCpOptions getDistCpOptions() {
+        // DistCpOptions expects the first argument to be a file OR a list of Paths
+        List<Path> sourceUris=new ArrayList<Path>();
+        sourceUris.add(new Path(sourceStagingUri));
+        DistCpOptions distcpOptions = new DistCpOptions(sourceUris, new Path(targetStagingUri));
+
+        // setSyncFolder(true) ensures directory structure is maintained when source is copied to target
+        distcpOptions.setSyncFolder(true);
+        // skipCRCCheck if TDE is enabled.
+        if (Boolean.parseBoolean(conf.get(HiveDRArgs.TDE_ENCRYPTION_ENABLED.getName()))) {
+            distcpOptions.setSkipCRC(true);
         }
-        fullyQualifiedSrcStagingPaths.toArray(new Path[fullyQualifiedSrcStagingPaths.size()]);
-
-        DistCpOptions distcpOptions = new DistCpOptions(fullyQualifiedSrcStagingPaths,
-                new Path(fullyQualifiedTargetStagingPath));
-        /* setSyncFolder to false to retain dir structure as in source at the target. If set to true all files will be
-        copied to the same staging sir at target resulting in DuplicateFileException in DistCp.
-        */
-
-        distcpOptions.setSyncFolder(false);
         distcpOptions.setBlocking(true);
-        distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName())));
-        distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName())));
+        distcpOptions.setMaxMaps(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName())));
+        distcpOptions.setMapBandwidth(Integer.parseInt(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName())));
         return distcpOptions;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
index 0494cf6..2d6b8be 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml
@@ -142,6 +142,8 @@
             <arg>${clusterForJobRunWriteEP}</arg>
             <arg>-clusterForJobNNKerberosPrincipal</arg>
             <arg>${clusterForJobNNKerberosPrincipal}</arg>
+            <arg>-tdeEncryptionEnabled</arg>
+            <arg>${tdeEncryptionEnabled}</arg>
             <arg>-drJobName</arg>
             <arg>${drJobName}-${nominalTime}</arg>
             <arg>-executionStage</arg>
@@ -240,6 +242,8 @@
             <arg>${clusterForJobRunWriteEP}</arg>
             <arg>-clusterForJobNNKerberosPrincipal</arg>
             <arg>${clusterForJobNNKerberosPrincipal}</arg>
+            <arg>-tdeEncryptionEnabled</arg>
+            <arg>${tdeEncryptionEnabled}</arg>
             <arg>-drJobName</arg>
             <arg>${drJobName}-${nominalTime}</arg>
             <arg>-executionStage</arg>
@@ -340,6 +344,8 @@
             <arg>${clusterForJobRunWriteEP}</arg>
             <arg>-clusterForJobNNKerberosPrincipal</arg>
             <arg>${clusterForJobNNKerberosPrincipal}</arg>
+            <arg>-tdeEncryptionEnabled</arg>
+            <arg>${tdeEncryptionEnabled}</arg>
             <arg>-drJobName</arg>
             <arg>${drJobName}-${nominalTime}</arg>
             <arg>-executionStage</arg>

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
index 8d00bb5..331d57e 100644
--- a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties
@@ -102,6 +102,8 @@ replicationMaxMaps=5
 distcpMaxMaps=1
 # Change it to specify the bandwidth in MB for each mapper in DistCP
 distcpMapBandwidth=100
+# Set this flag to true if TDE encryption is enabled on source and target
+tdeEncryptionEnabled=false
 
 ##### Email Notification for Falcon instance completion
 falcon.recipe.notification.type=email

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
index 8b39673..3df89d3 100644
--- a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
+++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
@@ -129,6 +129,10 @@ public class HiveReplicationRecipeTool implements Recipe {
             additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(),
                     recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()));
         }
+        if (StringUtils.isEmpty(
+                recipeProperties.getProperty(HiveReplicationRecipeToolOptions.TDE_ENCRYPTION_ENABLED.getName()))) {
+            additionalProperties.put(HiveReplicationRecipeToolOptions.TDE_ENCRYPTION_ENABLED.getName(), "false");
+        }
         return additionalProperties;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
index ec0465d..3d69d6e 100644
--- a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
+++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
@@ -54,6 +54,7 @@ public enum HiveReplicationRecipeToolOptions {
     CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
             "Write EP of cluster on which replication job runs", false),
     CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false),
+    TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false),
     HIVE_DR_JOB_NAME("drJobName", "Unique hive DR job name", false);
 
     private final String name;

http://git-wip-us.apache.org/repos/asf/falcon/blob/c404da28/docs/src/site/twiki/HiveDR.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/HiveDR.twiki b/docs/src/site/twiki/HiveDR.twiki
index a8f6aee..cf35694 100644
--- a/docs/src/site/twiki/HiveDR.twiki
+++ b/docs/src/site/twiki/HiveDR.twiki
@@ -53,6 +53,11 @@ Following is the prerequisites to use Hive DR
    in Falcon conf client.properties. Now update the copied recipe properties file with required attributes to replicate metadata and data from source cluster to
    destination cluster for Hive DR.
 
+   * *Note : HiveDR on TDE encrypted clusters*
+   When submitting HiveDR recipe in a kerberos secured setup, it is possible that the source and target staging directories
+   are encrypted using Transparent Data Encryption (TDE). If your cluster dirs are TDE encrypted, please set
+   "tdeEncryptionEnabled=true" in the recipe properties file. Default value for this property is "false".
+
 ---+++ Submit Hive DR recipe
    After updating the recipe properties file with required attributes in directory path or in falcon.recipe.path,
    there are two ways of submitting the Hive DR recipe:
@@ -72,3 +77,4 @@ Following is the prerequisites to use Hive DR
 *Note:*
    * Recipe properties file, workflow file and template file name must match to the recipe name, it must be unique and in the same directory.
    * If kerberos security is enabled on cluster, use the secure templates for Hive DR from $FALCON_HOME/data-mirroring/hive-disaster-recovery .
+