You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pi...@apache.org on 2016/01/28 16:39:33 UTC
falcon git commit: FALCON-1777 Add regression for HDFS replication
(recipe). Contributed by Paul Isaychuk
Repository: falcon
Updated Branches:
refs/heads/master 3b8d32ffd -> 33d72f77a
FALCON-1777 Add regression for HDFS replication (recipe). Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/33d72f77
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/33d72f77
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/33d72f77
Branch: refs/heads/master
Commit: 33d72f77ac11dfd344d2905afc4d42e056f450b5
Parents: 3b8d32f
Author: Paul Isaychuk <pi...@apache.org>
Authored: Thu Jan 28 17:37:40 2016 +0200
Committer: Paul Isaychuk <pi...@apache.org>
Committed: Thu Jan 28 17:37:40 2016 +0200
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 2 +
.../regression/Entities/RecipeMerlin.java | 40 +++---
.../regression/hive/dr/HdfsRecipeTest.java | 126 +++++++++++++++++++
.../hive-disaster-recovery-workflow.xml | 8 +-
.../hive-disaster-recovery.properties | 21 ++--
5 files changed, 169 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index f2df91a..e6664f3 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -5,6 +5,8 @@ Trunk (Unreleased)
INCOMPATIBLE CHANGES
NEW FEATURES
+ FALCON-1777 Add regression for HDFS replication (recipe) (Paul Isaychuk)
+
FALCON-1699 Test fixes for RetentionTest, LineageApiTest, TouchAPIPrismAndServerTest, FeedReplicationTest and few fortifications(Paul Isaychuk via Pragya Mittal)
FALCON-1698 New tests for ProcessSetupTest, ClusterSetupTest, UI test fixes(Paul Isaychuk via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
index 40fec08..9b9cff2 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/RecipeMerlin.java
@@ -131,27 +131,35 @@ public final class RecipeMerlin {
public RecipeMerlin withSourceCluster(ClusterMerlin sourceCluster) {
this.srcCluster = sourceCluster;
- properties.setProperty("sourceCluster", sourceCluster.getName());
- properties.setProperty("sourceMetastoreUri", sourceCluster.getProperty("hive.metastore.uris"));
- properties.setProperty("sourceHiveServer2Uri", sourceCluster.getProperty("hive.server2.uri"));
- //properties.setProperty("sourceServicePrincipal",
- // sourceCluster.getProperty("hive.metastore.kerberos.principal"));
- properties.setProperty("sourceStagingPath", sourceCluster.getLocation("staging"));
- properties.setProperty("sourceNN", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE));
- properties.setProperty("sourceRM", sourceCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+ if (recipeOperation == FalconCLI.RecipeOperation.HDFS_REPLICATION) {
+ properties.setProperty("drSourceClusterFS", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+ } else {
+ properties.setProperty("sourceCluster", sourceCluster.getName());
+ properties.setProperty("sourceMetastoreUri", sourceCluster.getProperty("hive.metastore.uris"));
+ properties.setProperty("sourceHiveServer2Uri", sourceCluster.getProperty("hive.server2.uri"));
+ //properties.setProperty("sourceServicePrincipal",
+ // sourceCluster.getProperty("hive.metastore.kerberos.principal"));
+ properties.setProperty("sourceStagingPath", sourceCluster.getLocation("staging"));
+ properties.setProperty("sourceNN", sourceCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+ properties.setProperty("sourceRM", sourceCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+ }
return this;
}
public RecipeMerlin withTargetCluster(ClusterMerlin targetCluster) {
this.tgtCluster = targetCluster;
- properties.setProperty("targetCluster", targetCluster.getName());
- properties.setProperty("targetMetastoreUri", targetCluster.getProperty("hive.metastore.uris"));
- properties.setProperty("targetHiveServer2Uri", targetCluster.getProperty("hive.server2.uri"));
- //properties.setProperty("targetServicePrincipal",
- // targetCluster.getProperty("hive.metastore.kerberos.principal"));
- properties.setProperty("targetStagingPath", targetCluster.getLocation("staging"));
- properties.setProperty("targetNN", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE));
- properties.setProperty("targetRM", targetCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+ if (recipeOperation == FalconCLI.RecipeOperation.HDFS_REPLICATION) {
+ properties.setProperty("drTargetClusterFS", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+ } else {
+ properties.setProperty("targetCluster", targetCluster.getName());
+ properties.setProperty("targetMetastoreUri", targetCluster.getProperty("hive.metastore.uris"));
+ properties.setProperty("targetHiveServer2Uri", targetCluster.getProperty("hive.server2.uri"));
+ //properties.setProperty("targetServicePrincipal",
+ // targetCluster.getProperty("hive.metastore.kerberos.principal"));
+ properties.setProperty("targetStagingPath", targetCluster.getLocation("staging"));
+ properties.setProperty("targetNN", targetCluster.getInterfaceEndpoint(Interfacetype.WRITE));
+ properties.setProperty("targetRM", targetCluster.getInterfaceEndpoint(Interfacetype.EXECUTE));
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
new file mode 100644
index 0000000..05b9cf4
--- /dev/null
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.hive.dr;
+
+import org.apache.falcon.cli.FalconCLI;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.regression.Entities.ClusterMerlin;
+import org.apache.falcon.regression.Entities.RecipeMerlin;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.util.AssertUtil;
+import org.apache.falcon.regression.core.util.BundleUtil;
+import org.apache.falcon.regression.core.util.HadoopUtil;
+import org.apache.falcon.regression.core.util.InstanceUtil;
+import org.apache.falcon.regression.core.util.MatrixUtil;
+import org.apache.falcon.regression.core.util.OSUtil;
+import org.apache.falcon.regression.core.util.TimeUtil;
+import org.apache.falcon.regression.testHelper.BaseTestClass;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.OozieClient;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Hdfs recipe test.
+ */
+@Test(groups = "embedded")
+public class HdfsRecipeTest extends BaseTestClass {
+ private static final Logger LOGGER = Logger.getLogger(HdfsRecipeTest.class);
+ private final ColoHelper cluster = servers.get(0);
+ private final ColoHelper cluster2 = servers.get(1);
+ private final FileSystem clusterFS = serverFS.get(0);
+ private final FileSystem clusterFS2 = serverFS.get(1);
+ private final OozieClient clusterOC = serverOC.get(0);
+ private final OozieClient clusterOC2 = serverOC.get(1);
+ private final String baseTestHDFSDir = cleanAndGetTestDir() + "/HdfsDR";
+ private String sourceDataLocation = baseTestHDFSDir + "/source";
+ private String targetDataLocation = baseTestHDFSDir + "/target";
+ private RecipeMerlin hdfsRecipe;
+
+ @DataProvider
+ public Object[][] getRecipeLocation() {
+ return MatrixUtil.crossProduct(RecipeExecLocation.values());
+ }
+
+ private void setUp(RecipeExecLocation recipeExecLocation) throws Exception {
+ bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster);
+ bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2);
+ bundles[0].generateUniqueBundle(this);
+ bundles[1].generateUniqueBundle(this);
+ final ClusterMerlin srcCluster = bundles[0].getClusterElement();
+ final ClusterMerlin tgtCluster = bundles[1].getClusterElement();
+ String recipeDir = "HdfsRecipe";
+ Bundle.submitCluster(recipeExecLocation.getRecipeBundle(bundles[0], bundles[1]));
+ hdfsRecipe = RecipeMerlin.readFromDir(recipeDir, FalconCLI.RecipeOperation.HDFS_REPLICATION)
+ .withRecipeCluster(recipeExecLocation.getRecipeCluster(srcCluster, tgtCluster));
+ hdfsRecipe.withSourceCluster(srcCluster)
+ .withTargetCluster(tgtCluster)
+ .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes))
+ .withValidity(TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(15));
+ hdfsRecipe.setUniqueName(this.getClass().getSimpleName());
+ }
+
+ /**
+ * Test recipe based replication with 1 source and 1 target.
+ */
+ @Test(dataProvider = "getRecipeLocation")
+ public void test1Source1Target(RecipeExecLocation execLocation) throws Exception {
+ setUp(execLocation);
+ hdfsRecipe.withSourceDir(sourceDataLocation).withTargetDir(targetDataLocation);
+ final List<String> command = hdfsRecipe.getSubmissionCommand();
+ Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed.");
+
+ InstanceUtil.waitTillInstanceReachState(execLocation.getRecipeOC(clusterOC, clusterOC2),
+ hdfsRecipe.getName(), 1, CoordinatorAction.Status.WAITING, EntityType.PROCESS);
+
+ HadoopUtil.copyDataToFolder(clusterFS, sourceDataLocation, OSUtil.NORMAL_INPUT);
+
+ InstanceUtil.waitTillInstanceReachState(execLocation.getRecipeOC(clusterOC, clusterOC2),
+ hdfsRecipe.getName(), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
+
+ //check if data has been replicated correctly
+ List<Path> cluster1ReplicatedData = HadoopUtil
+ .getAllFilesRecursivelyHDFS(clusterFS, new Path(sourceDataLocation));
+ List<Path> cluster2ReplicatedData = HadoopUtil
+ .getAllFilesRecursivelyHDFS(clusterFS2, new Path(targetDataLocation));
+
+ AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown() throws IOException {
+ try {
+ prism.getProcessHelper().deleteByName(hdfsRecipe.getName(), null);
+ } catch (Exception e) {
+ LOGGER.info("Deletion of process: " + hdfsRecipe.getName() + " failed with exception: " + e);
+ }
+ removeTestClassEntities();
+ cleanTestsDirs();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
index 7c4c53a..aa820d0 100644
--- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
+++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery-workflow.xml
@@ -47,6 +47,10 @@
<name>oozie.launcher.oozie.libpath</name>
<value>${wf:conf("falcon.libpath")}</value>
</property>
+ <property>
+ <name>oozie.launcher.mapreduce.job.hdfs-servers</name>
+ <value>${drSourceClusterFS},${drTargetClusterFS}</value>
+ </property>
</configuration>
<main-class>org.apache.falcon.replication.FeedReplicator</main-class>
<arg>-Dmapred.job.queue.name=${queueName}</arg>
@@ -56,7 +60,7 @@
<arg>-mapBandwidth</arg>
<arg>${distcpMapBandwidth}</arg>
<arg>-sourcePaths</arg>
- <arg>${drSourceClusterFS}${drSourceDir}</arg>
+ <arg>${drSourceDir}</arg>
<arg>-targetPath</arg>
<arg>${drTargetClusterFS}${drTargetDir}</arg>
<arg>-falconFeedStorageType</arg>
@@ -64,7 +68,7 @@
<arg>-availabilityFlag</arg>
<arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg>
<arg>-counterLogDir</arg>
- <arg>${logDir}/job-${nominalTime}</arg>
+ <arg>${logDir}/job-${nominalTime}/${srcClusterName == 'NA' ? '' : srcClusterName}</arg>
</java>
<ok to="end"/>
<error to="fail"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/33d72f77/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
index 7c95db7..6c715f3 100644
--- a/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
+++ b/falcon-regression/merlin/src/test/resources/HdfsRecipe/hive-disaster-recovery.properties
@@ -26,14 +26,13 @@ falcon.recipe.name=hdfs-replication-monthly
falcon.recipe.workflow.name=hdfs-dr-workflow
# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml
-# Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS
-#falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib
##### Cluster properties
+
# Cluster where job should run
falcon.recipe.cluster.name=primaryCluster
# Change the cluster hdfs write end point here. This is mandatory.
-falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://240.0.0.10:8020
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020
# Change the cluster validity start time here
falcon.recipe.cluster.validity.start=2015-03-13T00:00Z
# Change the cluster validity end time here
@@ -43,10 +42,6 @@ falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
# Change the recipe frequency here. Valid frequency type are minutes, hours, days, months
falcon.recipe.process.frequency=minutes(5)
-##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
-##### Uncomment to add tags
-#falcon.recipe.tags=
-
##### Retry policy properties
falcon.recipe.retry.policy=periodic
@@ -54,14 +49,20 @@ falcon.recipe.retry.delay=minutes(30)
falcon.recipe.retry.attempts=3
falcon.recipe.retry.onTimeout=false
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=
+
##### ACL properties - Uncomment and change ACL if authorization is enabled
-falcon.recipe.acl.owner=ambari-qa
-falcon.recipe.acl.group=users
-falcon.recipe.acl.permission=0x755
+#falcon.recipe.acl.owner=ambari-qa
+#falcon.recipe.acl.group=users
+#falcon.recipe.acl.permission=0x755
+#falcon.recipe.nn.principal=nn/_HOST@EXAMPLE.COM
##### Custom Job properties
+# Specify multiple comma separated source directories
drSourceDir=/user/falcon_qa/dr/test/primaryCluster/input
drSourceClusterFS=hdfs://240.0.0.10:8020
drTargetDir=/user/falcon_qa/dr/test/backupCluster/input