You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/11 10:20:02 UTC
[2/5] falcon git commit: FALCON-1188 Falcon support for Hive
Replication. Contributed by Venkat Ranganathan.
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
new file mode 100644
index 0000000..c441998
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-workflow.xml
@@ -0,0 +1,293 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-hive-workflow'>
+ <start to='last-event'/>
+ <action name="last-event">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp,hive,hive2,hcatalog</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-falconLibPath</arg>
+ <arg>${wf:conf("falcon.libpath")}</arg>
+ <arg>-sourceCluster</arg>
+ <arg>${sourceCluster}</arg>
+ <arg>-sourceMetastoreUri</arg>
+ <arg>${sourceMetastoreUri}</arg>
+ <arg>-sourceHiveServer2Uri</arg>
+ <arg>${sourceHiveServer2Uri}</arg>
+ <arg>-sourceDatabase</arg>
+ <arg>${sourceDatabase}</arg>
+ <arg>-sourceTable</arg>
+ <arg>${sourceTable}</arg>
+ <arg>-sourceStagingPath</arg>
+ <arg>${sourceStagingPath}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-targetCluster</arg>
+ <arg>${targetCluster}</arg>
+ <arg>-targetMetastoreUri</arg>
+ <arg>${targetMetastoreUri}</arg>
+ <arg>-targetHiveServer2Uri</arg>
+ <arg>${targetHiveServer2Uri}</arg>
+ <arg>-targetStagingPath</arg>
+ <arg>${targetStagingPath}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-maxEvents</arg>
+ <arg>${maxEvents}</arg>
+ <arg>-clusterForJobRun</arg>
+ <arg>${clusterForJobRun}</arg>
+ <arg>-clusterForJobRunWriteEP</arg>
+ <arg>${clusterForJobRunWriteEP}</arg>
+ <arg>-drJobName</arg>
+ <arg>${drJobName}-${nominalTime}</arg>
+ <arg>-executionStage</arg>
+ <arg>lastevents</arg>
+ </java>
+ <ok to="export-dr-replication"/>
+ <error to="failure"/>
+ </action>
+ <!-- Export Replication action -->
+ <action name="export-dr-replication">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp,hive,hive2,hcatalog</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-falconLibPath</arg>
+ <arg>${wf:conf("falcon.libpath")}</arg>
+ <arg>-replicationMaxMaps</arg>
+ <arg>${replicationMaxMaps}</arg>
+ <arg>-distcpMaxMaps</arg>
+ <arg>${distcpMaxMaps}</arg>
+ <arg>-sourceCluster</arg>
+ <arg>${sourceCluster}</arg>
+ <arg>-sourceMetastoreUri</arg>
+ <arg>${sourceMetastoreUri}</arg>
+ <arg>-sourceHiveServer2Uri</arg>
+ <arg>${sourceHiveServer2Uri}</arg>
+ <arg>-sourceDatabase</arg>
+ <arg>${sourceDatabase}</arg>
+ <arg>-sourceTable</arg>
+ <arg>${sourceTable}</arg>
+ <arg>-sourceStagingPath</arg>
+ <arg>${sourceStagingPath}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-targetCluster</arg>
+ <arg>${targetCluster}</arg>
+ <arg>-targetMetastoreUri</arg>
+ <arg>${targetMetastoreUri}</arg>
+ <arg>-targetHiveServer2Uri</arg>
+ <arg>${targetHiveServer2Uri}</arg>
+ <arg>-targetStagingPath</arg>
+ <arg>${targetStagingPath}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-maxEvents</arg>
+ <arg>${maxEvents}</arg>
+ <arg>-distcpMapBandwidth</arg>
+ <arg>${distcpMapBandwidth}</arg>
+ <arg>-clusterForJobRun</arg>
+ <arg>${clusterForJobRun}</arg>
+ <arg>-clusterForJobRunWriteEP</arg>
+ <arg>${clusterForJobRunWriteEP}</arg>
+ <arg>-drJobName</arg>
+ <arg>${drJobName}-${nominalTime}</arg>
+ <arg>-executionStage</arg>
+ <arg>export</arg>
+ </java>
+ <ok to="import-dr-replication"/>
+ <error to="failure"/>
+ </action>
+ <!-- Import Replication action -->
+ <action name="import-dr-replication">
+ <java>
+ <job-tracker>${jobTracker}</job-tracker>
+ <name-node>${nameNode}</name-node>
+ <configuration>
+ <property> <!-- hadoop 2 parameter -->
+ <name>oozie.launcher.mapreduce.job.user.classpath.first</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>mapred.job.queue.name</name>
+ <value>${queueName}</value>
+ </property>
+ <property>
+ <name>oozie.launcher.mapred.job.priority</name>
+ <value>${jobPriority}</value>
+ </property>
+ <property>
+ <name>oozie.use.system.libpath</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>oozie.action.sharelib.for.java</name>
+ <value>distcp,hive,hive2,hcatalog</value>
+ </property>
+ </configuration>
+ <main-class>org.apache.falcon.hive.HiveDRTool</main-class>
+ <arg>-Dmapred.job.queue.name=${queueName}</arg>
+ <arg>-Dmapred.job.priority=${jobPriority}</arg>
+ <arg>-falconLibPath</arg>
+ <arg>${wf:conf("falcon.libpath")}</arg>
+ <arg>-replicationMaxMaps</arg>
+ <arg>${replicationMaxMaps}</arg>
+ <arg>-distcpMaxMaps</arg>
+ <arg>${distcpMaxMaps}</arg>
+ <arg>-sourceCluster</arg>
+ <arg>${sourceCluster}</arg>
+ <arg>-sourceMetastoreUri</arg>
+ <arg>${sourceMetastoreUri}</arg>
+ <arg>-sourceHiveServer2Uri</arg>
+ <arg>${sourceHiveServer2Uri}</arg>
+ <arg>-sourceDatabase</arg>
+ <arg>${sourceDatabase}</arg>
+ <arg>-sourceTable</arg>
+ <arg>${sourceTable}</arg>
+ <arg>-sourceStagingPath</arg>
+ <arg>${sourceStagingPath}</arg>
+ <arg>-sourceNN</arg>
+ <arg>${sourceNN}</arg>
+ <arg>-targetCluster</arg>
+ <arg>${targetCluster}</arg>
+ <arg>-targetMetastoreUri</arg>
+ <arg>${targetMetastoreUri}</arg>
+ <arg>-targetHiveServer2Uri</arg>
+ <arg>${targetHiveServer2Uri}</arg>
+ <arg>-targetStagingPath</arg>
+ <arg>${targetStagingPath}</arg>
+ <arg>-targetNN</arg>
+ <arg>${targetNN}</arg>
+ <arg>-maxEvents</arg>
+ <arg>${maxEvents}</arg>
+ <arg>-distcpMapBandwidth</arg>
+ <arg>${distcpMapBandwidth}</arg>
+ <arg>-clusterForJobRun</arg>
+ <arg>${clusterForJobRun}</arg>
+ <arg>-clusterForJobRunWriteEP</arg>
+ <arg>${clusterForJobRunWriteEP}</arg>
+ <arg>-drJobName</arg>
+ <arg>${drJobName}-${nominalTime}</arg>
+ <arg>-executionStage</arg>
+ <arg>import</arg>
+ </java>
+ <ok to="success"/>
+ <error to="failure"/>
+ </action>
+ <decision name="success">
+ <switch>
+ <case to="successAlert">
+ ${drNotificationReceivers ne 'NA'}
+ </case>
+ <default to="end"/>
+ </switch>
+ </decision>
+ <decision name="failure">
+ <switch>
+ <case to="failureAlert">
+ ${drNotificationReceivers ne 'NA'}
+ </case>
+ <default to="fail"/>
+ </switch>
+ </decision>
+ <action name="successAlert">
+ <email xmlns="uri:oozie:email-action:0.2">
+ <to>${drNotificationReceivers}</to>
+ <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject>
+ <body>
+ The Hive DR workflow ${wf:id()} is successful.
+ Source = ${sourceCluster}
+ Target = ${targetCluster}
+ DB Name = ${sourceDatabase}
+ Table Name = ${sourceTable}
+ </body>
+ </email>
+ <ok to="end"/>
+ <error to="end"/>
+ </action>
+ <action name="failureAlert">
+ <email xmlns="uri:oozie:email-action:0.2">
+ <to>${drNotificationReceivers}</to>
+ <subject>ERROR: Hive DR workflow ${drJobName} failed</subject>
+ <body>
+ The Hive DR workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())}
+ Source = ${sourceCluster}
+ Target = ${targetCluster}
+ DB Name = ${sourceDatabase}
+ Table Name = ${sourceTable}
+ </body>
+ </email>
+ <ok to="end"/>
+ <error to="fail"/>
+ </action>
+ <kill name="fail">
+ <message>
+ Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+ </message>
+ </kill>
+ <end name="end"/>
+</workflow-app>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
----------------------------------------------------------------------
diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
new file mode 100644
index 0000000..42ae30b
--- /dev/null
+++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery.properties
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+##### NOTE: This is a TEMPLATE file which can be copied and edited
+
+##### Recipe properties
+falcon.recipe.name=hive-disaster-recovery
+
+
+##### Workflow properties
+falcon.recipe.workflow.name=hive-dr-workflow
+# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
+falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-workflow.xml
+
+##### Cluster properties
+
+# Change the cluster name where replication job should run here
+falcon.recipe.cluster.name=backupCluster
+# Change the cluster hdfs write end point here. This is mandatory.
+falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020
+# Change the cluster validity start time here
+falcon.recipe.cluster.validity.start=2014-10-01T00:00Z
+# Change the cluster validity end time here
+falcon.recipe.cluster.validity.end=2016-12-30T00:00Z
+
+##### Scheduling properties
+
+# Change the process frequency here. Valid frequency type are minutes, hours, days, months
+falcon.recipe.process.frequency=minutes(60)
+
+##### Retry policy properties
+
+falcon.recipe.retry.policy=periodic
+falcon.recipe.retry.delay=minutes(30)
+falcon.recipe.retry.attempts=3
+
+##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma
+##### Uncomment to add tags
+#falcon.recipe.tags=owner=landing,pipeline=adtech
+
+##### ACL properties - Uncomment and change ACL if authorization is enabled
+
+#falcon.recipe.acl.owner=testuser
+#falcon.recipe.acl.group=group
+#falcon.recipe.acl.permission=0x755
+
+##### Custom Job properties
+
+##### Source Cluster DR properties
+sourceCluster=primaryCluster
+sourceMetastoreUri=thrift://localhost:9083
+sourceHiveServer2Uri=hive2://localhost:10000
+# For DB level replicaiton to replicate multiple databases specify comma separated list of tables
+sourceDatabase=default
+# For DB level replication specify * for sourceTable.
+# For table level replication to replicate multiple tables specify comma separated list of tables
+sourceTable=testtable_dr
+sourceStagingPath=/apps/hive/tools/dr
+sourceNN=hdfs://localhost:8020
+
+##### Target Cluster DR properties
+targetCluster=backupCluster
+targetMetastoreUri=thrift://localhost:9083
+targetHiveServer2Uri=hive2://localhost:10000
+targetStagingPath=/apps/hive/tools/dr
+targetNN=hdfs://localhost:8020
+
+# To ceil the max events processed each time job runs. Set it to max value depending on your bandwidth limit.
+# Setting it to -1 will process all the events but can hog up the bandwidth. Use it judiciously!
+maxEvents=-1
+# Change it to specify the maximum number of mappers for replication
+replicationMaxMaps=5
+# Change it to specify the maximum number of mappers for DistCP
+distcpMaxMaps=1
+# Change it to specify the bandwidth in MB for each mapper in DistCP
+distcpMapBandwidth=100
+
+##### Email on failure
+drNotificationReceivers=NA
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/pom.xml
----------------------------------------------------------------------
diff --git a/client/pom.xml b/client/pom.xml
index afa91c9..c162125 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -105,6 +105,12 @@
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-webhcat-java-client</artifactId>
+ <version>${hive.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 148f789..11f6bff 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -124,8 +124,17 @@ public class FalconCLI {
// Recipe Command
public static final String RECIPE_CMD = "recipe";
public static final String RECIPE_NAME = "name";
+ public static final String RECIPE_OPERATION= "operation";
public static final String RECIPE_TOOL_CLASS_NAME = "tool";
+ /**
+ * Recipe operation enum.
+ */
+ public static enum RecipeOperation {
+ HDFS_REPLICATION,
+ HIVE_DISASTER_RECOVERY
+ }
+
private final Properties clientProperties;
public FalconCLI() throws Exception {
@@ -914,6 +923,9 @@ public class FalconCLI {
Option recipeToolClassName = new Option(RECIPE_TOOL_CLASS_NAME, true, "recipe class");
recipeOptions.addOption(recipeToolClassName);
+ Option recipeOperation = new Option(RECIPE_OPERATION, true, "recipe operation");
+ recipeOptions.addOption(recipeOperation);
+
return recipeOptions;
}
@@ -1005,11 +1017,23 @@ public class FalconCLI {
private void recipeCommand(CommandLine commandLine, FalconClient client) throws FalconCLIException {
String recipeName = commandLine.getOptionValue(RECIPE_NAME);
String recipeToolClass = commandLine.getOptionValue(RECIPE_TOOL_CLASS_NAME);
+ String recipeOperation = commandLine.getOptionValue(RECIPE_OPERATION);
validateNotEmpty(recipeName, RECIPE_NAME);
+ validateNotEmpty(recipeOperation, RECIPE_OPERATION);
+ validateRecipeOperations(recipeOperation);
- String result =
- client.submitRecipe(recipeName, recipeToolClass).getMessage();
+ String result = client.submitRecipe(recipeName, recipeToolClass, recipeOperation).toString();
OUT.get().println(result);
}
+
+ private static void validateRecipeOperations(String recipeOperation) throws FalconCLIException {
+ for(RecipeOperation operation : RecipeOperation.values()) {
+ if (operation.toString().equalsIgnoreCase(recipeOperation)) {
+ return;
+ }
+ }
+ throw new FalconCLIException("Allowed Recipe operations: "
+ + java.util.Arrays.asList((RecipeOperation.values())));
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index 9649e10..d9bdf64 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -963,7 +963,8 @@ public class FalconClient extends AbstractFalconClient {
}
public APIResult submitRecipe(String recipeName,
- String recipeToolClassName) throws FalconCLIException {
+ String recipeToolClassName,
+ final String recipeOperation) throws FalconCLIException {
String recipePath = clientProperties.getProperty("falcon.recipe.path");
if (StringUtils.isEmpty(recipePath)) {
@@ -999,6 +1000,7 @@ public class FalconClient extends AbstractFalconClient {
"-" + RecipeToolArgs.RECIPE_FILE_ARG.getName(), recipeFilePath,
"-" + RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG.getName(), propertiesFilePath,
"-" + RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG.getName(), processFile,
+ "-" + RecipeToolArgs.RECIPE_OPERATION_ARG.getName(), recipeOperation,
};
if (recipeToolClassName != null) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
new file mode 100644
index 0000000..cf24078
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeTool.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Properties;
+import java.io.File;
+
+/**
+ * Hdfs Replication recipe tool for Falcon recipes.
+ */
+public class HdfsReplicationRecipeTool implements Recipe {
+
+ private static final String COMMA_SEPARATOR = ",";
+
+ @Override
+ public void validate(final Properties recipeProperties) {
+ for (HdfsReplicationRecipeToolOptions option : HdfsReplicationRecipeToolOptions.values()) {
+ if (recipeProperties.getProperty(option.getName()) == null && option.isRequired()) {
+ throw new IllegalArgumentException("Missing argument: " + option.getName());
+ }
+ }
+ }
+
+ @Override
+ public Properties getAdditionalSystemProperties(final Properties recipeProperties) {
+ Properties additionalProperties = new Properties();
+
+ // Construct fully qualified hdfs src path
+ String srcPaths = recipeProperties.getProperty(HdfsReplicationRecipeToolOptions
+ .REPLICATION_SOURCE_DIR.getName());
+ StringBuilder absoluteSrcPaths = new StringBuilder();
+ String srcFsPath = recipeProperties.getProperty(
+ HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT.getName());
+ if (StringUtils.isNotEmpty(srcFsPath)) {
+ srcFsPath = StringUtils.removeEnd(srcFsPath, File.separator);
+ }
+ if (StringUtils.isNotEmpty(srcPaths)) {
+ String[] paths = srcPaths.split(COMMA_SEPARATOR);
+
+ for (String path : paths) {
+ StringBuilder srcpath = new StringBuilder(srcFsPath);
+ srcpath.append(path.trim());
+ srcpath.append(COMMA_SEPARATOR);
+ absoluteSrcPaths.append(srcpath);
+ }
+ }
+
+ additionalProperties.put(HdfsReplicationRecipeToolOptions.REPLICATION_SOURCE_DIR.getName(),
+ StringUtils.removeEnd(absoluteSrcPaths.toString(), COMMA_SEPARATOR));
+ return additionalProperties;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
new file mode 100644
index 0000000..4c3b543
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HdfsReplicationRecipeToolOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+/**
+ * Hdfs Recipe tool options.
+ */
+public enum HdfsReplicationRecipeToolOptions {
+ REPLICATION_SOURCE_DIR("drSourceDir", "Location of source data to replicate"),
+ REPLICATION_SOURCE_CLUSTER_FS_WRITE_ENDPOINT("drSourceClusterFS", "Source replication cluster end point"),
+ REPLICATION_TARGET_DIR("drTargetDir", "Location on target cluster for replication"),
+ REPLICATION_TARGET_CLUSTER_FS_WRITE_ENDPOINT("drTargetClusterFS", "Target replication cluster end point"),
+ REPLICATION_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during replication"),
+ REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication");
+
+ private final String name;
+ private final String description;
+ private final boolean isRequired;
+
+ HdfsReplicationRecipeToolOptions(String name, String description) {
+ this(name, description, true);
+ }
+
+ HdfsReplicationRecipeToolOptions(String name, String description, boolean isRequired) {
+ this.name = name;
+ this.description = description;
+ this.isRequired = isRequired;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public boolean isRequired() {
+ return isRequired;
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
new file mode 100644
index 0000000..8b39673
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeTool.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.api.HCatDatabase;
+import org.apache.hive.hcatalog.api.HCatTable;
+import org.apache.hive.hcatalog.api.ObjectNotFoundException;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hive.hcatalog.common.HCatException;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Hive Replication recipe tool for Falcon recipes.
+ */
+public class HiveReplicationRecipeTool implements Recipe {
+ private static final String ALL_TABLES = "*";
+
+ @Override
+ public void validate(final Properties recipeProperties) throws Exception {
+ for (HiveReplicationRecipeToolOptions option : HiveReplicationRecipeToolOptions.values()) {
+ if (recipeProperties.getProperty(option.getName()) == null && option.isRequired()) {
+ throw new IllegalArgumentException("Missing argument: " + option.getName());
+ }
+ }
+
+ HCatClient sourceMetastoreClient = null;
+ HCatClient targetMetastoreClient = null;
+ try {
+ // Validate if DB exists - source and target
+ sourceMetastoreClient = getHiveMetaStoreClient(
+ recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+ .REPLICATION_SOURCE_METASTORE_URI.getName()),
+ recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+ .REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()),
+ recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+ .REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()));
+
+ String sourceDbList = recipeProperties.getProperty(
+ HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_DATABASE.getName());
+
+ if (StringUtils.isEmpty(sourceDbList)) {
+ throw new Exception("No source DB specified in property file");
+ }
+
+ String sourceTableList = recipeProperties.getProperty(
+ HiveReplicationRecipeToolOptions.REPLICATION_SOURCE_TABLE.getName());
+ if (StringUtils.isEmpty(sourceTableList)) {
+ throw new Exception("No source table specified in property file. For DB replication please specify * "
+ + "for sourceTable");
+ }
+
+ String[] srcDbs = sourceDbList.split(",");
+ if (srcDbs.length <= 0) {
+ throw new Exception("No source DB specified in property file");
+ }
+ for (String db : srcDbs) {
+ if (!dbExists(sourceMetastoreClient, db)) {
+ throw new Exception("Database " + db + " doesn't exist on source cluster");
+ }
+ }
+
+ if (!sourceTableList.equals(ALL_TABLES)) {
+ String[] srcTables = sourceTableList.split(",");
+ if (srcTables.length > 0) {
+ for (String table : srcTables) {
+ if (!tableExists(sourceMetastoreClient, srcDbs[0], table)) {
+ throw new Exception("Table " + table + " doesn't exist on source cluster");
+ }
+ }
+ }
+ }
+
+ targetMetastoreClient = getHiveMetaStoreClient(
+ recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+ .REPLICATION_TARGET_METASTORE_URI.getName()),
+ recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+ .REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName()),
+ recipeProperties.getProperty(HiveReplicationRecipeToolOptions
+ .REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()));
+ // Verify db exists on target
+ for (String db : srcDbs) {
+ if (!dbExists(targetMetastoreClient, db)) {
+ throw new Exception("Database " + db + " doesn't exist on target cluster");
+ }
+ }
+ } finally {
+ if (sourceMetastoreClient != null) {
+ sourceMetastoreClient.close();
+ }
+ if (targetMetastoreClient != null) {
+ targetMetastoreClient.close();
+ }
+ }
+ }
+
+ @Override
+ public Properties getAdditionalSystemProperties(final Properties recipeProperties) {
+ Properties additionalProperties = new Properties();
+ String recipeName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
+ // Add recipe name as Hive DR job
+ additionalProperties.put(HiveReplicationRecipeToolOptions.HIVE_DR_JOB_NAME.getName(), recipeName);
+ additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN.getName(),
+ recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName()));
+ additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(),
+ recipeProperties.getProperty(RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName()));
+ if (StringUtils.isNotEmpty(recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()))) {
+ additionalProperties.put(HiveReplicationRecipeToolOptions.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(),
+ recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName()));
+ }
+ return additionalProperties;
+ }
+
+ private HCatClient getHiveMetaStoreClient(String metastoreUrl, String metastorePrincipal,
+ String hive2Principal) throws Exception {
+ try {
+ HiveConf hcatConf = createHiveConf(new Configuration(false), metastoreUrl,
+ metastorePrincipal, hive2Principal);
+ return HCatClient.create(hcatConf);
+ } catch (IOException e) {
+ throw new Exception("Exception creating HCatClient: " + e.getMessage(), e);
+ }
+ }
+
+ private static HiveConf createHiveConf(Configuration conf, String metastoreUrl, String metastorePrincipal,
+ String hive2Principal) throws IOException {
+ HiveConf hcatConf = new HiveConf(conf, HiveConf.class);
+
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ if (StringUtils.isNotEmpty(metastorePrincipal)) {
+ hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metastorePrincipal);
+ hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true");
+ hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "true");
+ }
+ if (StringUtils.isNotEmpty(hive2Principal)) {
+ hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, hive2Principal);
+ hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos");
+ }
+
+ return hcatConf;
+ }
+
+ private static boolean tableExists(HCatClient client, final String database, final String tableName)
+ throws Exception {
+ try {
+ HCatTable table = client.getTable(database, tableName);
+ return table != null;
+ } catch (ObjectNotFoundException e) {
+ System.out.println(e.getMessage());
+ return false;
+ } catch (HCatException e) {
+ throw new Exception("Exception checking if the table exists:" + e.getMessage(), e);
+ }
+ }
+
+ private static boolean dbExists(HCatClient client, final String database)
+ throws Exception {
+ try {
+ HCatDatabase db = client.getDatabase(database);
+ return db != null;
+ } catch (ObjectNotFoundException e) {
+ System.out.println(e.getMessage());
+ return false;
+ } catch (HCatException e) {
+ throw new Exception("Exception checking if the db exists:" + e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
new file mode 100644
index 0000000..ec0465d
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/HiveReplicationRecipeToolOptions.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+/**
+ * Hive Recipe tool options.
+ */
+public enum HiveReplicationRecipeToolOptions {
+ REPLICATION_SOURCE_CLUSTER("sourceCluster", "Replication source cluster name"),
+ REPLICATION_SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri"),
+ REPLICATION_SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"),
+ REPLICATION_SOURCE_DATABASE("sourceDatabase", "List of databases to replicate"),
+ REPLICATION_SOURCE_TABLE("sourceTable", "List of tables to replicate"),
+ REPLICATION_SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path"),
+ REPLICATION_SOURCE_NN("sourceNN", "Source name node"),
+ REPLICATION_SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name node kerberos principal", false),
+ REPLICATION_SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal",
+ "Source hive metastore kerberos principal", false),
+ REPLICATION_SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal",
+ "Source hiveserver2 kerberos principal", false),
+
+ REPLICATION_TARGET_CLUSTER("targetCluster", "Replication target cluster name"),
+ REPLICATION_TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive metastore uri"),
+ REPLICATION_TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"),
+ REPLICATION_TARGET_STAGING_PATH("targetStagingPath", "Location of target staging path"),
+ REPLICATION_TARGET_NN("targetNN", "Target name node"),
+ REPLICATION_TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos principal", false),
+ REPLICATION_TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal",
+ "Target hive metastore kerberos principal", false),
+ REPLICATION_TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal",
+ "Target hiveserver2 kerberos principal", false),
+
+ REPLICATION_MAX_EVENTS("maxEvents", "Maximum events to replicate"),
+ REPLICATION_MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication"),
+ DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp"),
+ REPLICATION_MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"),
+ CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false),
+ CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal",
+ "Write EP of cluster on which replication job runs", false),
+ CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false),
+ HIVE_DR_JOB_NAME("drJobName", "Unique hive DR job name", false);
+
+ private final String name;
+ private final String description;
+ private final boolean isRequired;
+
+ HiveReplicationRecipeToolOptions(String name, String description) {
+ this(name, description, true);
+ }
+
+ HiveReplicationRecipeToolOptions(String name, String description, boolean isRequired) {
+ this.name = name;
+ this.description = description;
+ this.isRequired = isRequired;
+ }
+
+ public String getName() {
+ return this.name;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public boolean isRequired() {
+ return isRequired;
+ }
+
+ @Override
+ public String toString() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/Recipe.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/Recipe.java b/client/src/main/java/org/apache/falcon/recipe/Recipe.java
new file mode 100644
index 0000000..609131d
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/Recipe.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import java.util.Properties;
+
+/**
+ * Recipe interface.
+ */
+public interface Recipe {
+ void validate(final Properties recipeProperties) throws Exception;
+ Properties getAdditionalSystemProperties(final Properties recipeProperties);
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
new file mode 100644
index 0000000..32b0871
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeFactory.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe;
+
+import org.apache.falcon.cli.FalconCLI.RecipeOperation;
+
+/**
+ * Recipe factory.
+ */
+public final class RecipeFactory {
+
+ private RecipeFactory() {
+ }
+
+ public static Recipe getRecipeToolType(String recipeType) {
+ if (recipeType == null) {
+ return null;
+ }
+
+ if (RecipeOperation.HDFS_REPLICATION.toString().equalsIgnoreCase(recipeType)) {
+ return new HdfsReplicationRecipeTool();
+ } else if (RecipeOperation.HIVE_DISASTER_RECOVERY.toString().equalsIgnoreCase(recipeType)) {
+ return new HiveReplicationRecipeTool();
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
index 069db9f..243ff4d 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeTool.java
@@ -19,44 +19,44 @@
package org.apache.falcon.recipe;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.recipe.util.RecipeProcessBuilderUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.commons.cli.Options;
-import java.io.BufferedReader;
import java.io.File;
-import java.io.InputStream;
-import java.io.IOException;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.OutputStream;
-import java.util.Map;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
/**
* Base recipe tool for Falcon recipes.
*/
public class RecipeTool extends Configured implements Tool {
private static final String HDFS_WF_PATH = "falcon" + File.separator + "recipes" + File.separator;
- private static final String RECIPE_PREFIX = "falcon.recipe.";
- private static final Pattern RECIPE_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##");
-
- private FileSystem hdfsFileSystem;
+ private static final FsPermission FS_PERMISSION =
+ new FsPermission(FsAction.ALL, FsAction.READ, FsAction.NONE);
+ private static final String FS_DEFAULT_NAME_KEY = "fs.defaultFS";
+ private static final String NN_PRINCIPAL = "dfs.namenode.kerberos.principal";
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new RecipeTool(), args);
@@ -64,25 +64,38 @@ public class RecipeTool extends Configured implements Tool {
@Override
public int run(String[] arguments) throws Exception {
+
Map<RecipeToolArgs, String> argMap = setupArgs(arguments);
if (argMap == null || argMap.isEmpty()) {
throw new Exception("Arguments passed to recipe is null");
}
-
+ Configuration conf = getConf();
String recipePropertiesFilePath = argMap.get(RecipeToolArgs.RECIPE_PROPERTIES_FILE_ARG);
Properties recipeProperties = loadProperties(recipePropertiesFilePath);
validateProperties(recipeProperties);
- FileSystem fs = getFileSystemForHdfs(recipeProperties);
+ String recipeOperation = argMap.get(RecipeToolArgs.RECIPE_OPERATION_ARG);
+ Recipe recipeType = RecipeFactory.getRecipeToolType(recipeOperation);
+ if (recipeType != null) {
+ recipeType.validate(recipeProperties);
+ Properties props = recipeType.getAdditionalSystemProperties(recipeProperties);
+ if (props != null && !props.isEmpty()) {
+ recipeProperties.putAll(props);
+ }
+ }
+ String processFilename;
+
+ FileSystem fs = getFileSystemForHdfs(recipeProperties, conf);
validateArtifacts(recipeProperties, fs);
- String recipeName = FilenameUtils.getBaseName(recipePropertiesFilePath);
+ String recipeName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
copyFilesToHdfsIfRequired(recipeProperties, fs, recipeName);
- Map<String, String> overlayMap = getOverlay(recipeProperties);
- String processFilename = overlayParametersOverTemplate(argMap.get(RecipeToolArgs.RECIPE_FILE_ARG),
- argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG), overlayMap);
+ processFilename = RecipeProcessBuilderUtils.createProcessFromTemplate(argMap.get(RecipeToolArgs
+ .RECIPE_FILE_ARG), recipeProperties, argMap.get(RecipeToolArgs.RECIPE_PROCESS_XML_FILE_PATH_ARG));
+
+
System.out.println("Generated process file to be scheduled: ");
System.out.println(FileUtils.readFileToString(new File(processFilename)));
@@ -98,7 +111,7 @@ public class RecipeTool extends Configured implements Tool {
addOption(options, arg, arg.isRequired());
}
- CommandLine cmd = new GnuParser().parse(options, arguments);
+ CommandLine cmd = new GnuParser().parse(options, arguments);
for (RecipeToolArgs arg : RecipeToolArgs.values()) {
String optionValue = arg.getOptionValue(cmd);
if (StringUtils.isNotEmpty(optionValue)) {
@@ -135,7 +148,7 @@ public class RecipeTool extends Configured implements Tool {
}
}
- private static void validateArtifacts(final Properties recipeProperties, final FileSystem fs) throws Exception{
+ private static void validateArtifacts(final Properties recipeProperties, final FileSystem fs) throws Exception {
// validate the WF path
String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName());
@@ -156,53 +169,6 @@ public class RecipeTool extends Configured implements Tool {
}
}
- private static Map<String, String> getOverlay(final Properties recipeProperties) {
- Map<String, String> overlay = new HashMap<String, String>();
- for (Map.Entry<Object, Object> entry : recipeProperties.entrySet()) {
- String key = StringUtils.removeStart((String) entry.getKey(), RECIPE_PREFIX);
- overlay.put(key, (String) entry.getValue());
- }
-
- return overlay;
- }
-
- private static String overlayParametersOverTemplate(final String templateFile,
- final String outFilename,
- Map<String, String> overlay) throws Exception {
- if (templateFile == null || outFilename == null || overlay == null || overlay.isEmpty()) {
- throw new IllegalArgumentException("Invalid arguments passed");
- }
-
- String line;
- OutputStream out = null;
- BufferedReader reader = null;
-
- try {
- out = new FileOutputStream(outFilename);
-
- reader = new BufferedReader(new FileReader(templateFile));
- while ((line = reader.readLine()) != null) {
- Matcher matcher = RECIPE_VAR_PATTERN.matcher(line);
- while (matcher.find()) {
- String variable = line.substring(matcher.start(), matcher.end());
- String paramString = overlay.get(variable.substring(2, variable.length() - 2));
- if (paramString == null) {
- throw new Exception("Match not found for the template: " + variable
- + ". Please add it in recipe properties file");
- }
- line = line.replace(variable, paramString);
- matcher = RECIPE_VAR_PATTERN.matcher(line);
- }
- out.write(line.getBytes());
- out.write("\n".getBytes());
- }
- } finally {
- IOUtils.closeQuietly(reader);
- IOUtils.closeQuietly(out);
- }
- return outFilename;
- }
-
private static void copyFilesToHdfsIfRequired(final Properties recipeProperties,
final FileSystem fs,
final String recipeName) throws Exception {
@@ -262,7 +228,7 @@ public class RecipeTool extends Configured implements Tool {
private static void createDirOnHdfs(String path, FileSystem fs) throws IOException {
Path hdfsPath = new Path(path);
if (!fs.exists(hdfsPath)) {
- fs.mkdirs(hdfsPath);
+ FileSystem.mkdirs(fs, hdfsPath, FS_PERMISSION);
}
}
@@ -287,19 +253,33 @@ public class RecipeTool extends Configured implements Tool {
fs.copyFromLocalFile(false, true, new Path(localFilePath), new Path(hdfsFilePath));
}
- private static Configuration getConfiguration(final String storageEndpoint) throws Exception {
- Configuration conf = new Configuration();
- conf.set("fs.defaultFS", storageEndpoint);
- return conf;
+ private FileSystem getFileSystemForHdfs(final Properties recipeProperties,
+ final Configuration conf) throws Exception {
+ String storageEndpoint = RecipeToolOptions.CLUSTER_HDFS_WRITE_ENDPOINT.getName();
+ String nameNode = recipeProperties.getProperty(storageEndpoint);
+ conf.set(FS_DEFAULT_NAME_KEY, nameNode);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String nameNodePrincipal = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NN_PRINCIPAL.getName());
+ conf.set(NN_PRINCIPAL, nameNodePrincipal);
+ }
+ return createFileSystem(UserGroupInformation.getLoginUser(), new URI(nameNode), conf);
}
- private FileSystem getFileSystemForHdfs(final Properties recipeProperties) throws Exception {
- if (hdfsFileSystem == null) {
- String storageEndpoint = RecipeToolOptions.SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT.getName();
- hdfsFileSystem = FileSystem.get(
- getConfiguration(recipeProperties.getProperty(storageEndpoint)));
- }
+ private FileSystem createFileSystem(UserGroupInformation ugi, final URI uri,
+ final Configuration conf) throws Exception {
+ try {
+ final String proxyUserName = ugi.getShortUserName();
+ if (proxyUserName.equals(UserGroupInformation.getLoginUser().getShortUserName())) {
+ return FileSystem.get(uri, conf);
+ }
- return hdfsFileSystem;
+ return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ public FileSystem run() throws Exception {
+ return FileSystem.get(uri, conf);
+ }
+ });
+ } catch (InterruptedException ex) {
+ throw new IOException("Exception creating FileSystem:" + ex.getMessage(), ex);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
index baa4846..79d8f18 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolArgs.java
@@ -25,10 +25,11 @@ import org.apache.commons.cli.Option;
* Recipe tool args.
*/
public enum RecipeToolArgs {
- RECIPE_FILE_ARG("file", "recipe file path"),
+ RECIPE_FILE_ARG("file", "recipe template file path"),
RECIPE_PROPERTIES_FILE_ARG("propertiesFile", "recipe properties file path"),
RECIPE_PROCESS_XML_FILE_PATH_ARG(
- "recipeProcessFilePath", "file path of recipe process to be submitted");
+ "recipeProcessFilePath", "file path of recipe process to be submitted"),
+ RECIPE_OPERATION_ARG("recipeOperation", "recipe operation");
private final String name;
private final String description;
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
index a1c29cd..5df9b0a 100644
--- a/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
+++ b/client/src/main/java/org/apache/falcon/recipe/RecipeToolOptions.java
@@ -18,19 +18,43 @@
package org.apache.falcon.recipe;
+import java.util.Map;
+import java.util.HashMap;
+
/**
* Recipe tool options.
*/
public enum RecipeToolOptions {
- SOURCE_CLUSTER_HDFS_WRITE_ENDPOINT(
- "falcon.recipe.src.cluster.hdfs.writeEndPoint", "source cluster HDFS write endpoint"),
+ RECIPE_NAME("falcon.recipe.name", "Recipe name", false),
+ CLUSTER_NAME("falcon.recipe.cluster.name", "Cluster name where replication job should run", false),
+ CLUSTER_HDFS_WRITE_ENDPOINT(
+ "falcon.recipe.cluster.hdfs.writeEndPoint", "Cluster HDFS write endpoint"),
+ CLUSTER_VALIDITY_START("falcon.recipe.cluster.validity.start", "Source cluster validity start", false),
+ CLUSTER_VALIDITY_END("falcon.recipe.cluster.validity.end", "Source cluster validity end", false),
+ WORKFLOW_NAME("falcon.recipe.workflow.name", "Workflow name", false),
WORKFLOW_PATH("falcon.recipe.workflow.path", "Workflow path", false),
- WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false);
+ WORKFLOW_LIB_PATH("falcon.recipe.workflow.lib.path", "WF lib path", false),
+ PROCESS_FREQUENCY("falcon.recipe.process.frequency", "Process frequency", false),
+ RETRY_POLICY("falcon.recipe.retry.policy", "Retry policy", false),
+ RETRY_DELAY("falcon.recipe.retry.delay", "Retry delay", false),
+ RETRY_ATTEMPTS("falcon.recipe.retry.attempts", "Retry attempts", false),
+ RECIPE_TAGS("falcon.recipe.tags", "Recipe tags", false),
+ RECIPE_ACL_OWNER("falcon.recipe.acl.owner", "Recipe acl owner", false),
+ RECIPE_ACL_GROUP("falcon.recipe.acl.group", "Recipe acl group", false),
+ RECIPE_ACL_PERMISSION("falcon.recipe.acl.permission", "Recipe acl permission", false),
+ RECIPE_NN_PRINCIPAL("falcon.recipe.nn.principal", "Recipe DFS NN principal", false);
private final String name;
private final String description;
private final boolean isRequired;
+ public static final Map<String, RecipeToolOptions> OPTIONSMAP = new HashMap<>();
+ static {
+ for (RecipeToolOptions c : RecipeToolOptions.values()) {
+ OPTIONSMAP.put(c.getName(), c);
+ }
+ }
+
RecipeToolOptions(String name, String description) {
this(name, description, true);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
new file mode 100644
index 0000000..9522816
--- /dev/null
+++ b/client/src/main/java/org/apache/falcon/recipe/util/RecipeProcessBuilderUtils.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.recipe.util;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.ACL;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Retry;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.recipe.RecipeToolOptions;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.ValidationEvent;
+import javax.xml.bind.ValidationEventHandler;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Recipe builder utility.
+ */
+public final class RecipeProcessBuilderUtils {
+
+ private static final Pattern RECIPE_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##");
+
+ private RecipeProcessBuilderUtils() {
+ }
+
+ public static String createProcessFromTemplate(final String processTemplateFile, final Properties recipeProperties,
+ final String processFilename) throws Exception {
+ org.apache.falcon.entity.v0.process.Process process = bindAttributesInTemplate(
+ processTemplateFile, recipeProperties);
+ String recipeProcessFilename = createProcessXmlFile(processFilename, process);
+
+ validateProcessXmlFile(recipeProcessFilename);
+ return recipeProcessFilename;
+ }
+
+ private static org.apache.falcon.entity.v0.process.Process
+ bindAttributesInTemplate(final String templateFile, final Properties recipeProperties)
+ throws Exception {
+ if (templateFile == null || recipeProperties == null) {
+ throw new IllegalArgumentException("Invalid arguments passed");
+ }
+
+ Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller();
+ // Validation can be skipped for unmarshalling as we want to bind tempalte with the properties. Vaildation is
+ // hanles as part of marshalling
+ unmarshaller.setSchema(null);
+ unmarshaller.setEventHandler(new ValidationEventHandler() {
+ public boolean handleEvent(ValidationEvent validationEvent) {
+ return true;
+ }
+ }
+ );
+
+ URL processResourceUrl = new File(templateFile).toURI().toURL();
+ org.apache.falcon.entity.v0.process.Process process =
+ (org.apache.falcon.entity.v0.process.Process) unmarshaller.unmarshal(processResourceUrl);
+
+ /* For optional properties user might directly set them in the process xml and might not set it in properties
+ file. Before doing the submission validation is done to confirm process xml doesn't have RECIPE_VAR_PATTERN
+ */
+
+ String processName = recipeProperties.getProperty(RecipeToolOptions.RECIPE_NAME.getName());
+ if (StringUtils.isNotEmpty(processName)) {
+ process.setName(processName);
+ }
+
+ // DR process template has only one cluster
+ bindClusterProperties(process.getClusters().getClusters().get(0), recipeProperties);
+
+ // bind scheduling properties
+ String processFrequency = recipeProperties.getProperty(RecipeToolOptions.PROCESS_FREQUENCY.getName());
+ if (StringUtils.isNotEmpty(processFrequency)) {
+ process.setFrequency(Frequency.fromString(processFrequency));
+ }
+
+ bindWorkflowProperties(process.getWorkflow(), recipeProperties);
+ bindRetryProperties(process.getRetry(), recipeProperties);
+ bindACLProperties(process.getACL(), recipeProperties);
+ bindTagsProperties(process, recipeProperties);
+ bindCustomProperties(process.getProperties(), recipeProperties);
+
+ return process;
+ }
+
+ private static void bindClusterProperties(final Cluster cluster,
+ final Properties recipeProperties) {
+ // DR process template has only one cluster
+ String clusterName = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_NAME.getName());
+ if (StringUtils.isNotEmpty(clusterName)) {
+ cluster.setName(clusterName);
+ }
+
+ String clusterStartValidity = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_START.getName());
+ if (StringUtils.isNotEmpty(clusterStartValidity)) {
+ cluster.getValidity().setStart(SchemaHelper.parseDateUTC(clusterStartValidity));
+ }
+
+ String clusterEndValidity = recipeProperties.getProperty(RecipeToolOptions.CLUSTER_VALIDITY_END.getName());
+ if (StringUtils.isNotEmpty(clusterEndValidity)) {
+ cluster.getValidity().setEnd(SchemaHelper.parseDateUTC(clusterEndValidity));
+ }
+ }
+
+ private static void bindWorkflowProperties(final Workflow wf,
+ final Properties recipeProperties) {
+ String wfName = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_NAME.getName());
+ if (StringUtils.isNotEmpty(wfName)) {
+ wf.setName(wfName);
+ }
+
+ String wfLibPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_LIB_PATH.getName());
+ if (StringUtils.isNotEmpty(wfLibPath)) {
+ wf.setLib(wfLibPath);
+ } else if (wf.getLib().startsWith("##")) {
+ wf.setLib("");
+ }
+
+ String wfPath = recipeProperties.getProperty(RecipeToolOptions.WORKFLOW_PATH.getName());
+ if (StringUtils.isNotEmpty(wfPath)) {
+ wf.setPath(wfPath);
+ }
+ }
+
+ private static void bindRetryProperties(final Retry processRetry,
+ final Properties recipeProperties) {
+ String retryPolicy = recipeProperties.getProperty(RecipeToolOptions.RETRY_POLICY.getName());
+ if (StringUtils.isNotEmpty(retryPolicy)) {
+ processRetry.setPolicy(PolicyType.fromValue(retryPolicy));
+ }
+
+ String retryAttempts = recipeProperties.getProperty(RecipeToolOptions.RETRY_ATTEMPTS.getName());
+ if (StringUtils.isNotEmpty(retryAttempts)) {
+ processRetry.setAttempts(Integer.parseInt(retryAttempts));
+ }
+
+ String retryDelay = recipeProperties.getProperty(RecipeToolOptions.RETRY_DELAY.getName());
+ if (StringUtils.isNotEmpty(retryDelay)) {
+ processRetry.setDelay(Frequency.fromString(retryDelay));
+ }
+ }
+
+ private static void bindACLProperties(final ACL acl,
+ final Properties recipeProperties) {
+ String aclowner = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_OWNER.getName());
+ if (StringUtils.isNotEmpty(aclowner)) {
+ acl.setOwner(aclowner);
+ }
+
+ String aclGroup = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_GROUP.getName());
+ if (StringUtils.isNotEmpty(aclGroup)) {
+ acl.setGroup(aclGroup);
+ }
+
+ String aclPermission = recipeProperties.getProperty(RecipeToolOptions.RECIPE_ACL_PERMISSION.getName());
+ if (StringUtils.isNotEmpty(aclPermission)) {
+ acl.setPermission(aclPermission);
+ }
+ }
+
+ private static void bindTagsProperties(final org.apache.falcon.entity.v0.process.Process process,
+ final Properties recipeProperties) {
+ String falconSystemTags = process.getTags();
+ String tags = recipeProperties.getProperty(RecipeToolOptions.RECIPE_TAGS.getName());
+ if (StringUtils.isNotEmpty(tags)) {
+ if (StringUtils.isNotEmpty(falconSystemTags)) {
+ tags += ", " + falconSystemTags;
+ }
+ process.setTags(tags);
+ }
+ }
+
+
+ private static void bindCustomProperties(final org.apache.falcon.entity.v0.process.Properties customProperties,
+ final Properties recipeProperties) {
+ List<Property> propertyList = new ArrayList<>();
+
+ for (Map.Entry<Object, Object> recipeProperty : recipeProperties.entrySet()) {
+ if (RecipeToolOptions.OPTIONSMAP.get(recipeProperty.getKey().toString()) == null) {
+ addProperty(propertyList, (String) recipeProperty.getKey(), (String) recipeProperty.getValue());
+ }
+ }
+
+ customProperties.getProperties().addAll(propertyList);
+ }
+
+ private static void addProperty(List<Property> propertyList, String name, String value) {
+ Property prop = new Property();
+ prop.setName(name);
+ prop.setValue(value);
+ propertyList.add(prop);
+ }
+
+ private static String createProcessXmlFile(final String outFilename,
+ final Entity entity) throws Exception {
+ if (outFilename == null || entity == null) {
+ throw new IllegalArgumentException("Invalid arguments passed");
+ }
+
+ EntityType type = EntityType.PROCESS;
+ OutputStream out = null;
+ try {
+ out = new FileOutputStream(outFilename);
+ type.getMarshaller().marshal(entity, out);
+ } catch (JAXBException e) {
+ throw new Exception("Unable to serialize the entity object " + type + "/" + entity.getName(), e);
+ } finally {
+ IOUtils.closeQuietly(out);
+ }
+ return outFilename;
+ }
+
+ private static void validateProcessXmlFile(final String processFileName) throws Exception {
+ if (processFileName == null) {
+ throw new IllegalArgumentException("Invalid arguments passed");
+ }
+
+ String line;
+ BufferedReader reader = null;
+
+ try {
+ reader = new BufferedReader(new FileReader(processFileName));
+ while ((line = reader.readLine()) != null) {
+ Matcher matcher = RECIPE_VAR_PATTERN.matcher(line);
+ if (matcher.find()) {
+ String variable = line.substring(matcher.start(), matcher.end());
+ throw new Exception("Match not found for the template: " + variable
+ + " in recipe template file. Please add it in recipe properties file");
+ }
+ }
+ } finally {
+ IOUtils.closeQuietly(reader);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/docs/src/site/twiki/InstallationSteps.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/InstallationSteps.twiki b/docs/src/site/twiki/InstallationSteps.twiki
index 3dd034b..90d765b 100644
--- a/docs/src/site/twiki/InstallationSteps.twiki
+++ b/docs/src/site/twiki/InstallationSteps.twiki
@@ -31,6 +31,8 @@ It builds and installs the package into the local repository, for use as a depen
[optionally -Doozie.version=<<oozie version>> can be appended to build with a specific version of Oozie. Oozie versions
>= 4 are supported]
NOTE: Falcon builds with JDK 1.7 using -noverify option
+ To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" can be appended. For this Hive >= 1.2.0
+ and Oozie >= 4.2.0 should be available.
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index e3de6a4..49fb4f7 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Properties;
/**
@@ -47,10 +48,15 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
private static final String[] LIBS = StartupProperties.get().getProperty("shared.libs").split(",");
- private static final FalconPathFilter NON_FALCON_JAR_FILTER = new FalconPathFilter() {
+ private static class FalconLibPath implements FalconPathFilter {
+ private String[] shareLibs;
+ FalconLibPath(String[] libList) {
+ this.shareLibs = Arrays.copyOf(libList, libList.length);
+ }
+
@Override
public boolean accept(Path path) {
- for (String jarName : LIBS) {
+ for (String jarName : shareLibs) {
if (path.getName().startsWith(jarName)) {
return true;
}
@@ -60,7 +66,7 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
@Override
public String getJarName(Path path) {
- for (String jarName : LIBS) {
+ for (String jarName : shareLibs) {
if (path.getName().startsWith(jarName)) {
return jarName;
}
@@ -84,9 +90,10 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
"lib");
Path libext = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(),
"libext");
+ FalconPathFilter nonFalconJarFilter = new FalconLibPath(LIBS);
Properties properties = StartupProperties.get();
pushLibsToHDFS(fs, properties.getProperty("system.lib.location"), lib,
- NON_FALCON_JAR_FILTER);
+ nonFalconJarFilter);
pushLibsToHDFS(fs, properties.getProperty("libext.paths"), libext, null);
pushLibsToHDFS(fs, properties.getProperty("libext.feed.paths"),
new Path(libext, EntityType.FEED.name()) , null);
@@ -107,7 +114,6 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
if (StringUtils.isEmpty(src)) {
return;
}
-
LOG.debug("Copying libs from {}", src);
createTargetPath(fs, target);
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 34a5471..ec04bdf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -130,7 +130,7 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<scope>provided</scope>
- <exclusions>
+ <exclusions>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
@@ -151,12 +151,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
</exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
@@ -166,10 +166,10 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
@@ -179,10 +179,10 @@
</exclusions>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-common</artifactId>
- <version>${hadoop.version}</version>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
@@ -200,18 +200,18 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <classifier>tests</classifier>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <classifier>tests</classifier>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
- <version>${hadoop.version}</version>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
@@ -241,6 +241,13 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</profile>
@@ -255,7 +262,7 @@
<descriptors>
<descriptor>src/main/assemblies/distributed-package.xml</descriptor>
<descriptor>src/main/assemblies/src-package.xml</descriptor>
- </descriptors>
+ </descriptors>
<finalName>apache-falcon-distributed-${project.version}</finalName>
</configuration>
</plugin>
@@ -368,11 +375,47 @@
<properties>
<excluded.test.groups/>
</properties>
- </profile>
+ </profile>
+ <profile>
+ <id>hivedr</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.3.1</version>
+ <executions>
+ <execution>
+ <id>enforce-property</id>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ <configuration>
+ <rules>
+ <requireProperty>
+ <property>hive.version</property>
+ <regex>^(1.2.*)</regex>
+ <regexMessage>HiveDR only supports hive version >= 1.2.0</regexMessage>
+ <property>oozie.version</property>
+ <regex>^(4.2.*)</regex>
+ <regexMessage>HiveDR only supports oozie version >= 4.2.0</regexMessage>
+ </requireProperty>
+ </rules>
+ <fail>true</fail>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <modules>
+ <module>addons/hivedr</module>
+ </modules>
+ </profile>
</profiles>
<modules>
- <module>falcon-ui</module>
+ <module>falcon-ui</module>
<module>checkstyle</module>
<module>build-tools</module>
<module>client</module>
@@ -882,12 +925,15 @@
<!-- this is needed for embedded oozie -->
<dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-webhcat-java-client</artifactId>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
- <!-- This implies you cannot use orc files -->
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</exclusion>
@@ -907,6 +953,18 @@
</dependency>
<dependency>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-webhcat-java-client</artifactId>
+ <version>${hive.version}</version>
+ <exclusions>
+ <exclusion> <!-- conflict with hadoop-auth -->
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>com.github.stephenc.findbugs</groupId>
<artifactId>findbugs-annotations</artifactId>
<version>1.3.9-1</version>
@@ -1034,7 +1092,7 @@
<version>2.8.1</version>
</plugin>
- <plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.16</version>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/replication/pom.xml
----------------------------------------------------------------------
diff --git a/replication/pom.xml b/replication/pom.xml
index 8c4d6b4..43b6463 100644
--- a/replication/pom.xml
+++ b/replication/pom.xml
@@ -26,9 +26,9 @@
<artifactId>falcon-main</artifactId>
<version>0.7-SNAPSHOT</version>
</parent>
- <artifactId>falcon-replication</artifactId>
- <description>Apache Falcon Replication Module</description>
- <name>Apache Falcon Replication</name>
+ <artifactId>falcon-distcp-replication</artifactId>
+ <description>Apache Falcon Distcp Replication Module</description>
+ <name>Apache Falcon Distcp Replication</name>
<packaging>jar</packaging>
<profiles>
@@ -46,6 +46,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </dependency>
</dependencies>
</profile>
</profiles>