You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pi...@apache.org on 2016/01/28 16:39:33 UTC

falcon git commit: FALCON-1777 Add regression for HDFS replication (recipe). Contributed by Paul Isaychuk

Repository: falcon
Updated Branches:
  refs/heads/master 3b8d32ffd -> 33d72f77a


FALCON-1777 Add regression for HDFS replication (recipe). Contributed by Paul Isaychuk


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/33d72f77
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/33d72f77
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/33d72f77

Branch: refs/heads/master
Commit: 33d72f77ac11dfd344d2905afc4d42e056f450b5
Parents: 3b8d32f
Author: Paul Isaychuk <pi...@apache.org>
Authored: Thu Jan 28 17:37:40 2016 +0200
Committer: Paul Isaychuk <pi...@apache.org>
Committed: Thu Jan 28 17:37:40 2016 +0200

----------------------------------------------------------------------
 falcon-regression/CHANGES.txt                   |   2 +
 .../regression/Entities/RecipeMerlin.java       |  40 +++---
 .../regression/hive/dr/HdfsRecipeTest.java      | 126 +++++++++++++++++++
 .../hive-disaster-recovery-workflow.xml         |   8 +-
 .../hive-disaster-recovery.properties           |  21 ++--
 5 files changed, 169 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index f2df91a..e6664f3 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+   FALCON-1777 Add regression for HDFS replication (recipe) (Paul Isaychuk)
+
    FALCON-1699 Test fixes for RetentionTest, LineageApiTest, TouchAPIPrismAndServerTest, FeedReplicationTest and few fortifications(Paul Isaychuk via Pragya Mittal)
 
    FALCON-1698 New tests for ProcessSetupTest, ClusterSetupTest, UI test fixes(Paul Isaychuk via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
index 40fec08..9b9cff2 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
@@ -131,27 +131,35 @@ public final class RecipeMerlin {
 
     public RecipeMerlin withSourceCluster(ClusterMerlin sourceCluster) {
         this.srcCluster = sourceCluster;
-        properties.setProperty("sourceCluster", sourceCluster.getName());
-        properties.setProperty("sourceMetastoreUri", sourceCluster.getProperty("hive.metastore.uris"));
-        properties.setProperty("sourceHiveServer2Uri", sourceCluster.getProperty("hive.server2.uri"));
-        //properties.setProperty("sourceServicePrincipal",
-        //    sourceCluster.getProperty("hive.metastore.kerberos.principal"));
-        properties.setProperty("sourceStagingPath", sourceCluster.getLocation("staging"));
-        properties.setProperty("sourceNN", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE));
-        properties.setProperty("sourceRM", sourceCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+        if (recipeOperation == FalconCLI.RecipeOperation.HDFS_REPLICATION) {
+            properties.setProperty("drSourceClusterFS", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+        } else {
+            properties.setProperty("sourceCluster", sourceCluster.getName());
+            properties.setProperty("sourceMetastoreUri", sourceCluster.getProperty("hive.metastore.uris"));
+            properties.setProperty("sourceHiveServer2Uri", sourceCluster.getProperty("hive.server2.uri"));
+            //properties.setProperty("sourceServicePrincipal",
+            //    sourceCluster.getProperty("hive.metastore.kerberos.principal"));
+            properties.setProperty("sourceStagingPath", sourceCluster.getLocation("staging"));
+            properties.setProperty("sourceNN", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+            properties.setProperty("sourceRM", sourceCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+        }
         return this;
     }
 
     public RecipeMerlin withTargetCluster(ClusterMerlin targetCluster) {
         this.tgtCluster = targetCluster;
-        properties.setProperty("targetCluster", targetCluster.getName());
-        properties.setProperty("targetMetastoreUri", targetCluster.getProperty("hive.metastore.uris"));
-        properties.setProperty("targetHiveServer2Uri", targetCluster.getProperty("hive.server2.uri"));
-        //properties.setProperty("targetServicePrincipal",
-        //    targetCluster.getProperty("hive.metastore.kerberos.principal"));
-        properties.setProperty("targetStagingPath", targetCluster.getLocation("staging"));
-        properties.setProperty("targetNN", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE));
-        properties.setProperty("targetRM", targetCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+        if (recipeOperation == FalconCLI.RecipeOperation.HDFS_REPLICATION) {
+            properties.setProperty("drTargetClusterFS", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+        } else {
+            properties.setProperty("targetCluster", targetCluster.getName());
+            properties.setProperty("targetMetastoreUri", targetCluster.getProperty("hive.metastore.uris"));
+            properties.setProperty("targetHiveServer2Uri", targetCluster.getProperty("hive.server2.uri"));
+            //properties.setProperty("targetServicePrincipal",
+            //    targetCluster.getProperty("hive.metastore.kerberos.principal"));
+            properties.setProperty("targetStagingPath", targetCluster.getLocation("staging"));
+            properties.setProperty("targetNN", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+            properties.setProperty("targetRM", targetCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+        }
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
new file mode 100644
index 0000000..05b9cf4
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.cli.FalconCLI;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.RecipeMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.MatrixUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Hdfs recipe test.
+ */
+@Test(groups = "embedded")
+public class HdfsRecipeTest extends BaseTestClass {
+    private static final Logger LOGGER = Logger.getLogger(HdfsRecipeTest.class);
+    private final ColoHelper cluster = servers.get(0);
+    private final ColoHelper cluster2 = servers.get(1);
+    private final FileSystem clusterFS = serverFS.get(0);
+    private final FileSystem clusterFS2 = serverFS.get(1);
+    private final OozieClient clusterOC = serverOC.get(0);
+    private final OozieClient clusterOC2 = serverOC.get(1);
+    private final String baseTestHDFSDir = cleanAndGetTestDir() + "/HdfsDR";
+    private String sourceDataLocation = baseTestHDFSDir + "/source";
+    private String targetDataLocation = baseTestHDFSDir + "/target";
+    private RecipeMerlin hdfsRecipe;
+
+    @DataProvider
+    public Object[][] getRecipeLocation() {
+        return MatrixUtil.crossProduct(RecipeExecLocation.values());
+    }
+
+    private void setUp(RecipeExecLocation recipeExecLocation) throws Exception {
+        bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster);
+        bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
+        bundles[0].generateUniqueBundle(this);
+        bundles[1].generateUniqueBundle(this);
+        final ClusterMerlin srcCluster = bundles[0].getClusterElement();
+        final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
+        String recipeDir = "HdfsRecipe";
+        Bundle.submitCluster(recipeExecLocation.getRecipeBundle(bundles[0], bundles[1]));
+        hdfsRecipe = RecipeMerlin.readFromDir(recipeDir, FalconCLI.RecipeOperation.HDFS_REPLICATION)
+            .withRecipeCluster(recipeExecLocation.getRecipeCluster(srcCluster, tgtCluster));
+        hdfsRecipe.withSourceCluster(srcCluster)
+            .withTargetCluster(tgtCluster)
+            .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
+            .withValidity(TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(15));
+        hdfsRecipe.setUniqueName(this.getClass().getSimpleName());
+    }
+
+    /**
+     * Test recipe based replication with 1 source and 1 target.
+     */
+    @Test(dataProvider = "getRecipeLocation")
+    public void test1Source1Target(RecipeExecLocation execLocation) throws Exception {
+        setUp(execLocation);
+        hdfsRecipe.withSourceDir(sourceDataLocation).withTargetDir(targetDataLocation);
+        final List<String> command = hdfsRecipe.getSubmissionCommand();
+        Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+        InstanceUtil.waitTillInstanceReachState(execLocation.getRecipeOC(clusterOC, clusterOC2),
+            hdfsRecipe.getName(), 1, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+
+        HadoopUtil.copyDataToFolder(clusterFS, sourceDataLocation, OSUtil.NORMAL_INPUT);
+
+        InstanceUtil.waitTillInstanceReachState(execLocation.getRecipeOC(clusterOC, clusterOC2),
+            hdfsRecipe.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+        //check if data has been replicated correctly
+        List<Path> cluster1ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(clusterFS, new Path(sourceDataLocation));
+        List<Path> cluster2ReplicatedData = HadoopUtil
+            .getAllFilesRecursivelyHDFS(clusterFS2, new Path(targetDataLocation));
+
+        AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void tearDown() throws IOException {
+        try {
+            prism.getProcessHelper().deleteByName(hdfsRecipe.getName(), null);
+        } catch (Exception e) {
+            LOGGER.info("Deletion of process: " + hdfsRecipe.getName() + " failed with exception: " + e);
+        }
+        removeTestClassEntities();
+        cleanTestsDirs();
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
index 7c4c53a..aa820d0 100644
--- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
+++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
@@ -47,6 +47,10 @@
                     <name>oozie.launcher.oozie.libpath</name>
                     <value>${wf:conf("falcon.libpath")}</value>
                 </property>
+                <property>
+                    <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+                    <value>${drSourceClusterFS},${drTargetClusterFS}</value>
+                </property>
             </configuration>
             <main-class>org.apache.falcon.replication.FeedReplicator</main-class>
             <arg>-Dmapred.job.queue.name=${queueName}</arg>
@@ -56,7 +60,7 @@
             <arg>-mapBandwidth</arg>
             <arg>${distcpMapBandwidth}</arg>
             <arg>-sourcePaths</arg>
-            <arg>${drSourceClusterFS}${drSourceDir}</arg>
+            <arg>${drSourceDir}</arg>
             <arg>-targetPath</arg>
             <arg>${drTargetClusterFS}${drTargetDir}</arg>
             <arg>-falconFeedStorageType</arg>
@@ -64,7 +68,7 @@
             <arg>-availabilityFlag</arg>
             <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
             <arg>-counterLogDir</arg>
-            <arg>${logDir}/job-${nominalTime}</arg>
+            <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}</arg>
         </java>
         <ok to="end"/>
         <error to="fail"/>

http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
index 7c95db7..6c715f3 100644
--- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
+++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
@@ -26,14 +26,13 @@ falcon.recipe.name=hdfs-replication-monthly
 falcon.recipe.workflow.name=hdfs-dr-workflow
 # Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
 falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml
-# Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS
-#falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib
 
 ##### Cluster properties
+
 # Cluster where job should run
 falcon.recipe.cluster.name=primaryCluster
 # Change the cluster hdfs write end point here. This is mandatory.
-falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://240.0.0.10:8020
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020
 # Change the cluster validity start time here
 falcon.recipe.cluster.validity.start=2015-03-13T00:00Z
 # Change the cluster validity end time here
@@ -43,10 +42,6 @@ falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
 # Change the recipe frequency here. Valid frequency type are minutes, hours, days, months
 falcon.recipe.process.frequency=minutes(5)
 
-##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
-##### Uncomment to add tags
-#falcon.recipe.tags=
-
 ##### Retry policy properties
 
 falcon.recipe.retry.policy=periodic
@@ -54,14 +49,20 @@ falcon.recipe.retry.delay=minutes(30)
 falcon.recipe.retry.attempts=3
 falcon.recipe.retry.onTimeout=false
 
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=
+
 ##### ACL properties - Uncomment and change ACL if authorization is enabled
 
-falcon.recipe.acl.owner=ambari-qa
-falcon.recipe.acl.group=users
-falcon.recipe.acl.permission=0x755
+#falcon.recipe.acl.owner=ambari-qa
+#falcon.recipe.acl.group=users
+#falcon.recipe.acl.permission=0x755
+#falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM
 
 ##### Custom Job properties
 
+# Specify multiple comma separated source directories
 drSourceDir=/user/falcon_qa/dr/test/primaryCluster/input
 drSourceClusterFS=hdfs://240.0.0.10:8020
 drTargetDir=/user/falcon_qa/dr/test/backupCluster/input