You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ba...@apache.org on 2016/08/16 22:48:37 UTC
[3/5] falcon git commit: Update falcon branch 0.10-refactored-ui to
be up to date with branch 0.10
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/entity/spark/spark-process-pi.xml
----------------------------------------------------------------------
diff --git a/examples/entity/spark/spark-process-pi.xml b/examples/entity/spark/spark-process-pi.xml
new file mode 100644
index 0000000..65c81cf
--- /dev/null
+++ b/examples/entity/spark/spark-process-pi.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="spark-pi" xmlns="uri:falcon:process:0.1">
+ <clusters>
+ <cluster name="local">
+ <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
+ </cluster>
+ </clusters>
+
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>minutes(5)</frequency>
+ <timezone>UTC</timezone>
+
+ <workflow engine="spark" path="/app/spark/"/>
+ <spark-attributes>
+ <master>local</master>
+ <name>Spark PI</name>
+ <class>org.apache.falcon.example.spark.SparkPI</class>
+ <jar>/app/spark/lib/falcon-examples.jar</jar>
+ <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
+ <arg>2</arg>
+ </spark-attributes>
+
+ <retry policy="periodic" delay="minutes(3)" attempts="3"/>
+
+</process>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/entity/spark/spark-process.xml
----------------------------------------------------------------------
diff --git a/examples/entity/spark/spark-process.xml b/examples/entity/spark/spark-process.xml
new file mode 100644
index 0000000..b9ecc98
--- /dev/null
+++ b/examples/entity/spark/spark-process.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+<process name="spark-process" xmlns="uri:falcon:process:0.1">
+ <clusters>
+ <cluster name="local">
+ <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
+ </cluster>
+ </clusters>
+
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>minutes(5)</frequency>
+ <timezone>UTC</timezone>
+
+ <inputs>
+ <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
+ <input name="inpaths" feed="in" start="now(0,-5)" end="now(0,-1)"/>
+ </inputs>
+
+ <outputs>
+ <!-- In the workflow, the output path will be available in a variable 'outpath' -->
+ <output name="outpath" feed="out" instance="now(0,0)"/>
+ </outputs>
+
+ <workflow engine="spark" path="/app/spark"/>
+ <spark-attributes>
+ <master>local</master>
+ <name>Java Spark Wordcount</name>
+ <class>org.apache.falcon.example.spark.SparkWordCount</class>
+ <jar>/app/spark/lib/falcon-examples.jar</jar>
+ <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
+ </spark-attributes>
+
+ <retry policy="periodic" delay="minutes(3)" attempts="3"/>
+
+</process>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/entity/spark/spark-sql-process.xml
----------------------------------------------------------------------
diff --git a/examples/entity/spark/spark-sql-process.xml b/examples/entity/spark/spark-sql-process.xml
new file mode 100644
index 0000000..cdd2ccc
--- /dev/null
+++ b/examples/entity/spark/spark-sql-process.xml
@@ -0,0 +1,55 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+<process name="spark-sql-process" xmlns="uri:falcon:process:0.1">
+ <!-- where -->
+ <clusters>
+ <cluster name="hcat-local">
+ <validity start="2013-11-15T00:05Z" end="2013-11-15T01:05Z"/>
+ </cluster>
+ </clusters>
+
+ <!-- when -->
+ <parallel>1</parallel>
+ <order>LIFO</order>
+ <frequency>minutes(5)</frequency>
+ <timezone>UTC</timezone>
+
+ <!-- what -->
+ <inputs>
+ <!-- In the workflow, the input paths will be available in a variable 'inpaths' -->
+ <input name="inparts" feed="hcat-in" start="now(0,-5)" end="now(0,-1)"/>
+ </inputs>
+
+ <outputs>
+ <!-- In the workflow, the output path will be available in a variable 'outpath' -->
+ <output name="outpart" feed="hcat-out" instance="now(0,0)"/>
+ </outputs>
+
+ <workflow engine="spark" path="/app/spark"/>
+ <spark-attributes>
+ <master>local</master>
+ <name>Spark SQL</name>
+ <class>org.apache.falcon.example.spark.SparkSQLProcessTable</class>
+ <jar>/app/spark/lib/falcon-examples.jar</jar>
+ <spark-opts>--num-executors 1 --driver-memory 512m --executor-memory 512m --executor-cores 1</spark-opts>
+ </spark-attributes>
+
+ <retry policy="periodic" delay="minutes(3)" attempts="3"/>
+
+</process>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 2ff3011..a1aedf8 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -23,11 +23,12 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-examples</artifactId>
<description>Apache Falcon Examples</description>
<name>Apache Falcon Examples</name>
+ <packaging>jar</packaging>
<dependencies>
<dependency>
@@ -35,6 +36,44 @@
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.mesos</groupId>
+ <artifactId>mesos</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_2.10</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java
new file mode 100644
index 0000000..7ae0b6b
--- /dev/null
+++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkPI.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.example.spark;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Computes an approximation to pi.
+ * Usage: JavaSparkPi [slices]
+ */
+public final class SparkPI {
+
+ private SparkPI() {
+ }
+
+ public static void main(String[] args) throws Exception {
+ SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
+
+ int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
+ int n = 1 * slices;
+ System.out.println("n:"+n+"\tslices:"+slices);
+ List<Integer> l = new ArrayList<Integer>(n);
+ for (int i = 0; i < n; i++) {
+ l.add(i);
+ }
+
+ JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
+
+ int count = dataSet.map(new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer integer) {
+ double x = Math.random() * 2 - 1;
+ double y = Math.random() * 2 - 1;
+ return (x * x + y * y < 1) ? 1 : 0;
+ }
+ }).reduce(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer integer, Integer integer2) {
+ return integer + integer2;
+ }
+ });
+
+ System.out.println("Pi is roughly " + 4.0 * count / n);
+
+ jsc.stop();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
new file mode 100644
index 0000000..5e9f092
--- /dev/null
+++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkSQLProcessTable.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.example.spark;
+
+import org.apache.spark.SparkContext;
+import org.apache.spark.SparkConf;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+
+/**
+ * Spark SQL Example.
+ */
+
+public final class SparkSQLProcessTable {
+
+ private SparkSQLProcessTable() {
+ }
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.out.println("Arguments must contain details for input or output table");
+ System.exit(0);
+ }
+
+ SparkConf conf = new SparkConf().setAppName("SparkSQL example");
+ SparkContext sc = new SparkContext(conf);
+ HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc);
+
+ String sqlQuery = "FROM " +args[2]+"."+args[1]+ " INSERT OVERWRITE TABLE " +args[5]+"."+args[4]
+ +" PARTITION("+args[3]+") SELECT word, SUM(cnt) AS cnt WHERE "+args[0]+" GROUP BY word";
+
+ DataFrame df = sqlContext.sql(sqlQuery);
+ df.show();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java b/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java
new file mode 100644
index 0000000..f74a536
--- /dev/null
+++ b/examples/src/main/java/org/apache/falcon/example/spark/SparkWordCount.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.example.spark;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+import java.util.Arrays;
+
+/**
+ * Spark Word Count example.
+ */
+public final class SparkWordCount {
+
+ private SparkWordCount() {
+ }
+ protected static final FlatMapFunction<String, String> WORDS_EXTRACTOR =
+ new FlatMapFunction<String, String>() {
+ public Iterable<String> call(String s) throws Exception {
+ return Arrays.asList(s.split(" "));
+ }
+ };
+
+ protected static final PairFunction<String, String, Integer> WORDS_MAPPER =
+ new PairFunction<String, String, Integer>() {
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ };
+
+ protected static final Function2<Integer, Integer, Integer> WORDS_REDUCER =
+ new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer a, Integer b) throws Exception {
+ return a + b;
+ }
+ };
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ System.err.println("Please provide the input file full path as argument");
+ System.exit(0);
+ }
+
+ SparkConf conf = new SparkConf().setAppName("Java WordCount");
+ JavaSparkContext context = new JavaSparkContext(conf);
+ JavaRDD<String> file = context.textFile(args[0]);
+ JavaRDD<String> words = file.flatMap(WORDS_EXTRACTOR);
+ JavaPairRDD<String, Integer> pairs = words.mapToPair(WORDS_MAPPER);
+ JavaPairRDD<String, Integer> counter = pairs.reduceByKey(WORDS_REDUCER);
+ counter.saveAsTextFile(args[1]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 4243dee..09d4249 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-extensions</artifactId>
<description>Apache Falcon server side extensions Module</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
index 949aea5..9222e0a 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
@@ -39,6 +39,7 @@ public class HiveMirroringExtension extends AbstractExtension {
private static final String ALL_TABLES = "*";
private static final String COMMA_DELIMITER = ",";
private static final String SECURE_RESOURCE = "-secure";
+ private static final String NOT_APPLICABLE = "NA";
@Override
public String getName() {
@@ -122,6 +123,12 @@ public class HiveMirroringExtension extends AbstractExtension {
additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(),
jobName);
+ // Get the first source DB
+ additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_DATABASE.getName(),
+ extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_DATABASES
+ .getName()).trim().split(",")[0]
+ );
+
String clusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName());
// Add required properties of cluster where job should run
additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(),
@@ -210,7 +217,7 @@ public class HiveMirroringExtension extends AbstractExtension {
String replicationMaxMaps =
extensionProperties.getProperty(HiveMirroringExtensionProperties.MAX_MAPS.getName());
if (StringUtils.isBlank(replicationMaxMaps)) {
- additionalProperties.put(HiveMirroringExtensionProperties.MAX_MAPS.getName(), "5");
+ additionalProperties.put(HiveMirroringExtensionProperties.MAX_MAPS.getName(), "2");
}
String distcpMaxMaps = extensionProperties.getProperty(
@@ -230,6 +237,16 @@ public class HiveMirroringExtension extends AbstractExtension {
additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(), "false");
}
+ if (StringUtils.isBlank(
+ extensionProperties.getProperty(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName()))) {
+ additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_STAGING_PATH.getName(), NOT_APPLICABLE);
+ }
+
+ if (StringUtils.isBlank(
+ extensionProperties.getProperty(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName()))) {
+ additionalProperties.put(HiveMirroringExtensionProperties.TARGET_STAGING_PATH.getName(), NOT_APPLICABLE);
+ }
+
return additionalProperties;
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
index 6c4f58d..828817b 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
@@ -27,6 +27,7 @@ public enum HiveMirroringExtensionProperties {
SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", false),
SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"),
SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"),
+ SOURCE_DATABASE("sourceDatabase", "Database to verify the setup connection", false),
SOURCE_TABLES("sourceTables", "List of tables to replicate", false),
SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path", false),
SOURCE_NN("sourceNN", "Source name node", false),
@@ -50,13 +51,13 @@ public enum HiveMirroringExtensionProperties {
MAX_EVENTS("maxEvents", "Maximum events to replicate", false),
MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication", false),
DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false),
- MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"),
+ MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false),
CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false),
- CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("Job cluster kerberos principal",
- "Write EP of cluster on which replication job runs", false),
+ CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal", "Job cluster kerberos principal",
+ false),
CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false),
TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false),
- HIVE_MIRRORING_JOB_NAME("jobName", "Unique hive replication job name", false);
+ HIVE_MIRRORING_JOB_NAME("hiveJobName", "Unique hive replication job name", false);
private final String name;
private final String description;
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin-core/pom.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/pom.xml b/falcon-regression/merlin-core/pom.xml
index fa3c939..367ef07 100644
--- a/falcon-regression/merlin-core/pom.xml
+++ b/falcon-regression/merlin-core/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.falcon.regression</groupId>
<artifactId>falcon-regression</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-merlin-core</artifactId>
<description>merlin-core - utilities for Apache Falcon regression suite</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/pom.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/pom.xml b/falcon-regression/merlin/pom.xml
index 73be13e..33012fc 100644
--- a/falcon-regression/merlin/pom.xml
+++ b/falcon-regression/merlin/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.falcon.regression</groupId>
<artifactId>falcon-regression</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-merlin</artifactId>
<description>Merlin - Regression test suite for Apache Falcon</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
index ec2b877..6405b30 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
@@ -56,7 +56,7 @@ import java.util.List;
* On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time.
* Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement.
*/
-@Test(groups = { "distributed", "embedded", "sanity" })
+@Test(groups = { "distributed", "embedded", "sanity", "multiCluster" })
public class FeedLateRerunTest extends BaseTestClass {
private ColoHelper cluster1 = servers.get(0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
index dad0dc2..3367817 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java
@@ -60,7 +60,7 @@ import java.util.Map;
* feed replication test.
* Replicates empty directories as well as directories containing data.
*/
-@Test(groups = { "distributed", "embedded", "sanity" })
+@Test(groups = { "distributed", "embedded", "sanity", "multiCluster" })
public class FeedReplicationTest extends BaseTestClass {
private ColoHelper cluster1 = servers.get(0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
index df1716f..dbb93eb 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/InstanceSummaryTest.java
@@ -173,7 +173,7 @@ public class InstanceSummaryTest extends BaseTestClass {
/**
* Adjust multi-cluster process. Submit and schedule it. Get its instances summary.
*/
- @Test(enabled = true, timeOut = 1200000)
+ @Test(enabled = true, timeOut = 1200000, groups = "multiCluster")
public void testSummaryMultiClusterProcess() throws JAXBException,
ParseException, IOException, URISyntaxException, AuthenticationException,
InterruptedException {
@@ -208,7 +208,7 @@ public class InstanceSummaryTest extends BaseTestClass {
/**
* Adjust multi-cluster feed. Submit and schedule it. Get its instances summary.
*/
- @Test(enabled = true, timeOut = 1200000)
+ @Test(enabled = true, timeOut = 1200000, groups = "multiCluster")
public void testSummaryMultiClusterFeed() throws JAXBException, ParseException, IOException,
URISyntaxException, OozieClientException, AuthenticationException,
InterruptedException {
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
index 20f8f46..ce4c903 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceColoMixedTest.java
@@ -48,7 +48,7 @@ import java.util.List;
/**
* Process instance mixed colo tests.
*/
-@Test(groups = { "distributed", "embedded" })
+@Test(groups = { "distributed", "embedded", "multiCluster" })
public class ProcessInstanceColoMixedTest extends BaseTestClass {
private final String baseTestHDFSDir = cleanAndGetTestDir();
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
index eb20d7c..db020ec 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatFeedOperationsTest.java
@@ -119,7 +119,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
*
* @throws Exception
*/
- @Test(groups = {"singleCluster"})
+ @Test(groups = {"multiCluster"})
public void submitFeedWhenTableDoesNotExist() throws Exception {
Bundle.submitCluster(bundles[1]);
feed = bundles[1].getInputFeedFromBundle();
@@ -159,7 +159,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
*
* @throws Exception
*/
- @Test
+ @Test(groups = {"multiCluster"})
public void submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget() throws Exception {
Bundle.submitCluster(bundles[0], bundles[1]);
final String startDate = "2010-01-01T20:00Z";
@@ -192,7 +192,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
*
* @throws Exception
*/
- @Test
+ @Test(groups = {"multiCluster"})
public void suspendAndResumeReplicationFeed() throws Exception {
submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
@@ -215,7 +215,7 @@ public class HCatFeedOperationsTest extends BaseTestClass {
*
* @throws Exception
*/
- @Test
+ @Test(groups = {"multiCluster"})
public void deleteReplicationFeed() throws Exception {
submitAndScheduleReplicationFeedWhenTableExistsOnSourceAndTarget();
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
index a96b17e..6643ce5 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hcat/HCatReplicationTest.java
@@ -64,7 +64,7 @@ import java.util.Map;
/**
* Tests for replication with hcat.
*/
-@Test(groups = "embedded")
+@Test(groups = {"embedded", "multiCluster"})
public class HCatReplicationTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(HCatReplicationTest.class);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
index 4a2d913..07996d5 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HdfsRecipeTest.java
@@ -50,7 +50,7 @@ import java.util.List;
/**
* Hdfs recipe test.
*/
-@Test(groups = "embedded")
+@Test(groups = {"embedded", "multiCluster"})
public class HdfsRecipeTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(HdfsRecipeTest.class);
private final ColoHelper cluster = servers.get(0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
index 4dab0db..7cd71e1 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java
@@ -65,7 +65,7 @@ import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanil
/**
* Hive DR Testing.
*/
-@Test(groups = "embedded")
+@Test(groups = {"embedded", "multiCluster"})
public class HiveDRTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(HiveDRTest.class);
private static final String DB_NAME = "hdr_sdb1";
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
index 5efd69f..e281bee 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java
@@ -59,7 +59,7 @@ import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanil
/**
* Hive DR Testing for Hive database replication.
*/
-@Test(groups = "embedded")
+@Test(groups = {"embedded", "multiCluster"})
public class HiveDbDRTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(HiveDbDRTest.class);
private final ColoHelper cluster = servers.get(0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
index 849f67a..ac91d59 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/EntitySummaryTest.java
@@ -128,7 +128,7 @@ public class EntitySummaryTest extends BaseTestClass {
* Get status of 7 feeds and 7 instances of each feed the call should give correct information,
* instance info must be recent.
*/
- @Test
+ @Test(groups = "multiCluster")
public void getFeedSummary() throws Exception {
//prepare feed template.
bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
index 4f86594..93efbac 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java
@@ -59,7 +59,7 @@ import java.util.List;
* expected instance statuses which are being compared with actual result of -list request
* with different parameters in different order, variation, etc.
*/
-@Test(groups = { "distributed", "embedded", "sanity" })
+@Test(groups = { "distributed", "embedded", "sanity", "multiCluster"})
public class ListFeedInstancesTest extends BaseTestClass {
private static final Logger LOGGER = Logger.getLogger(ListFeedInstancesTest.class);
private OozieClient cluster2OC = serverOC.get(1);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
index 54e7805..81e0a7e 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/nativeScheduler/NativeScheduleTest.java
@@ -184,7 +184,7 @@ public class NativeScheduleTest extends BaseTestClass {
* Successfully schedule process via native scheduler through prism and server on multiple cluster.
* Schedule the same process on oozie. It should fail.
*/
- @Test(groups = {"prism", "0.2"})
+ @Test(groups = {"prism", "0.2", "multiCluster"})
public void scheduleProcessWithNativeOnTwoClusters() throws Exception {
ProcessMerlin processMerlinNative = bundles[0].getProcessObject();
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
index 405725d..0a5e9ce 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedReplicationUpdateTest.java
@@ -49,7 +49,7 @@ import java.io.IOException;
/**
* Update replication feed tests.
*/
-@Test(groups = { "distributed", "embedded" })
+@Test(groups = { "distributed", "embedded", "multiCluster" })
public class PrismFeedReplicationUpdateTest extends BaseTestClass {
private ColoHelper cluster1 = servers.get(0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
index 137ef6f..1a1dc98 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedScheduleTest.java
@@ -38,7 +38,7 @@ import java.io.IOException;
/**
* Schedule feed via prism tests.
*/
-@Test(groups = { "distributed", "embedded" })
+@Test(groups = { "distributed", "embedded", "multiCluster"})
public class PrismFeedScheduleTest extends BaseTestClass {
private OozieClient cluster1OC = serverOC.get(0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
index a5220e3..6caac9f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedUpdateTest.java
@@ -96,7 +96,7 @@ public class PrismFeedUpdateTest extends BaseTestClass {
* Set 2 processes with common output feed. Second one is zero-input process. Update feed
* queue. TODO : complete test case
*/
- @Test(enabled = true, timeOut = 1200000)
+ @Test(enabled = true, timeOut = 1200000 , groups = "multiCluster")
public void updateFeedQueueDependentMultipleProcessOneProcessZeroInput() throws Exception {
//cluster1colo and cluster2colo are source. feed01 on cluster1colo target cluster2colo,
// feed02 on cluster2colo target cluster1colo
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
index 4aa7189..63f793f 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismProcessSnSTest.java
@@ -80,7 +80,7 @@ public class PrismProcessSnSTest extends BaseTestClass {
* Submit and schedule process2 on cluster2. Check that process2 is running and process1 is
* not running on cluster2.
*/
- @Test(groups = {"prism", "0.2", "embedded"})
+ @Test(groups = {"prism", "0.2", "embedded", "multiCluster"})
public void testProcessSnSOnBothColos() throws Exception {
//schedule both bundles
bundles[0].submitAndScheduleProcess();
@@ -100,7 +100,7 @@ public class PrismProcessSnSTest extends BaseTestClass {
* on cluster2. Submit process2 but schedule process1 once more. Check that process1 is running
* on cluster1 but not on cluster2.
*/
- @Test(groups = {"prism", "0.2", "embedded"})
+ @Test(groups = {"prism", "0.2", "embedded", "multiCluster"})
public void testProcessSnSForSubmittedProcessOnBothColos() throws Exception {
//schedule both bundles
bundles[0].submitProcess(true);
@@ -122,7 +122,7 @@ public class PrismProcessSnSTest extends BaseTestClass {
* once more and check that it is still running on cluster1 but process2 isn't running on
* cluster2.
*/
- @Test(groups = {"prism", "0.2", "embedded"})
+ @Test(groups = {"prism", "0.2", "embedded", "multiCluster"})
public void testProcessSnSForSubmittedProcessOnBothColosUsingColoHelper()
throws Exception {
bundles[0].submitProcess(true);
@@ -228,7 +228,7 @@ public class PrismProcessSnSTest extends BaseTestClass {
* running. Delete both of them. Submit and schedule them once more. Check that they are
* running again.
*/
- @Test(groups = {"prism", "0.2", "embedded"})
+ @Test(groups = {"prism", "0.2", "embedded", "multiCluster"})
public void testSnSDeletedProcessOnBothColos() throws Exception {
//schedule both bundles
final String cluster1Running = cluster1.getClusterHelper().getColoName() + "/RUNNING";
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java
index e99202b..a7887da 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/MirrorTest.java
@@ -56,7 +56,7 @@ import java.sql.Connection;
import java.util.Arrays;
/** UI tests for Mirror Setup Wizard. */
-@Test(groups = "search-ui")
+@Test(groups = {"search-ui", "multiCluster"})
public class MirrorTest extends BaseUITestClass {
private static final Logger LOGGER = Logger.getLogger(MirrorTest.class);
private final String baseTestDir = cleanAndGetTestDir();
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-regression/pom.xml
----------------------------------------------------------------------
diff --git a/falcon-regression/pom.xml b/falcon-regression/pom.xml
index daa88cb..987ddaa 100644
--- a/falcon-regression/pom.xml
+++ b/falcon-regression/pom.xml
@@ -24,11 +24,11 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<groupId>org.apache.falcon.regression</groupId>
<artifactId>falcon-regression</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
<description>Regression Framework for Falcon</description>
<name>Apache Falcon Regression</name>
<packaging>pom</packaging>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/css/img/user.svg
----------------------------------------------------------------------
diff --git a/falcon-ui/app/css/img/user.svg b/falcon-ui/app/css/img/user.svg
index 60ac6c5..fb534c2 100644
--- a/falcon-ui/app/css/img/user.svg
+++ b/falcon-ui/app/css/img/user.svg
@@ -1,4 +1,20 @@
<?xml version="1.0" encoding="utf-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
<!-- Generator: Adobe Illustrator 16.0.3, SVG Export Plug-In . SVG Version: 6.00 Build 0) -->
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg version="1.1" id="User" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" x="0px" y="0px"
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/css/styles/autocomplete-tags.less
----------------------------------------------------------------------
diff --git a/falcon-ui/app/css/styles/autocomplete-tags.less b/falcon-ui/app/css/styles/autocomplete-tags.less
index c2f5dc2..4c6fc96 100644
--- a/falcon-ui/app/css/styles/autocomplete-tags.less
+++ b/falcon-ui/app/css/styles/autocomplete-tags.less
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
.top-buffer{
padding-top:20px;
}
@@ -67,4 +85,4 @@
}
.suggestions-list:focus{
outline:none;
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/js/lib/popover.js
----------------------------------------------------------------------
diff --git a/falcon-ui/app/js/lib/popover.js b/falcon-ui/app/js/lib/popover.js
index e26c870..57814df 100644
--- a/falcon-ui/app/js/lib/popover.js
+++ b/falcon-ui/app/js/lib/popover.js
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
(function(window, angular, undefined){
'use strict';
@@ -460,4 +478,4 @@
};
}
]);
-})(window, window.angular);
\ No newline at end of file
+})(window, window.angular);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/app/test/e2e/protractor.js
----------------------------------------------------------------------
diff --git a/falcon-ui/app/test/e2e/protractor.js b/falcon-ui/app/test/e2e/protractor.js
index 37d6e65..6c2d97e 100644
--- a/falcon-ui/app/test/e2e/protractor.js
+++ b/falcon-ui/app/test/e2e/protractor.js
@@ -1,7 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
exports.config = {
chromeDriver: '../../../node_modules/protractor/selenium/chromedriver',
jasmineNodeOpts: {
showColors: true
}
-};
\ No newline at end of file
+};
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/karma.conf.js
----------------------------------------------------------------------
diff --git a/falcon-ui/karma.conf.js b/falcon-ui/karma.conf.js
index 11189e8..776249f 100644
--- a/falcon-ui/karma.conf.js
+++ b/falcon-ui/karma.conf.js
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
// Karma configuration
// Generated on Wed Sep 24 2014 21:15:41 GMT-0500 (CDT)
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/falcon-ui/pom.xml
----------------------------------------------------------------------
diff --git a/falcon-ui/pom.xml b/falcon-ui/pom.xml
index 93ed9bb..bbb917c 100644
--- a/falcon-ui/pom.xml
+++ b/falcon-ui/pom.xml
@@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-ui</artifactId>
<packaging>pom</packaging>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/hadoop-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/pom.xml b/hadoop-dependencies/pom.xml
index e2529f1..384cceb 100644
--- a/hadoop-dependencies/pom.xml
+++ b/hadoop-dependencies/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-hadoop-dependencies</artifactId>
<description>Apache Falcon Hadoop Dependencies Module</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/lifecycle/pom.xml
----------------------------------------------------------------------
diff --git a/lifecycle/pom.xml b/lifecycle/pom.xml
index 725f1e6..b940796 100644
--- a/lifecycle/pom.xml
+++ b/lifecycle/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-feed-lifecycle</artifactId>
<description>Apache Falcon Lifecycle Module</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/lifecycle/src/main/resources/action/feed/eviction-action.xml
----------------------------------------------------------------------
diff --git a/lifecycle/src/main/resources/action/feed/eviction-action.xml b/lifecycle/src/main/resources/action/feed/eviction-action.xml
index 4ab67d2..bded1d6 100644
--- a/lifecycle/src/main/resources/action/feed/eviction-action.xml
+++ b/lifecycle/src/main/resources/action/feed/eviction-action.xml
@@ -31,7 +31,7 @@
<!-- HCatalog jars -->
<property>
<name>oozie.action.sharelib.for.java</name>
- <value>hcatalog</value>
+ <value>hcatalog,hive</value>
</property>
<property>
<name>oozie.launcher.oozie.libpath</name>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index 667c5d1..3baddd9 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-messaging</artifactId>
<description>Apache Falcon JMS messaging Module</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index 8b48e93..90bbdd3 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -92,8 +92,7 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic destination = topicSession.createTopic(topicName);
- topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID,
- WorkflowNameBuilder.WorkflowName.getJMSFalconSelector(), false);
+ topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
topicSubscriber.setMessageListener(this);
connection.setExceptionListener(this);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 0ba9464..cffdb59 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -83,11 +83,6 @@ public class JMSMessageConsumerTest {
public void sendMessages(String topic, WorkflowExecutionContext.Type type)
throws JMSException, FalconException, IOException {
- sendMessages(topic, type, true);
- }
-
- public void sendMessages(String topic, WorkflowExecutionContext.Type type, boolean isFalconWF)
- throws JMSException, FalconException, IOException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
@@ -105,10 +100,10 @@ public class JMSMessageConsumerTest {
message = getMockFalconMessage(i, session);
break;
case WORKFLOW_JOB:
- message = getMockOozieMessage(i, session, isFalconWF);
+ message = getMockOozieMessage(i, session);
break;
case COORDINATOR_ACTION:
- message = getMockOozieCoordMessage(i, session, isFalconWF);
+ message = getMockOozieCoordMessage(i, session);
default:
break;
}
@@ -117,15 +112,10 @@ public class JMSMessageConsumerTest {
}
}
- private Message getMockOozieMessage(int i, Session session, boolean isFalconWF)
- throws FalconException, JMSException {
+ private Message getMockOozieMessage(int i, Session session) throws FalconException, JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("appType", "WORKFLOW_JOB");
- if (isFalconWF) {
- message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
- } else {
- message.setStringProperty("appName", "OozieSampleShellWF");
- }
+ message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
message.setStringProperty("user", "falcon");
switch(i % 4) {
case 0:
@@ -152,15 +142,11 @@ public class JMSMessageConsumerTest {
return message;
}
- private Message getMockOozieCoordMessage(int i, Session session, boolean isFalconWF)
+ private Message getMockOozieCoordMessage(int i, Session session)
throws FalconException, JMSException {
TextMessage message = session.createTextMessage();
message.setStringProperty("appType", "COORDINATOR_ACTION");
- if (isFalconWF) {
- message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
- } else {
- message.setStringProperty("appName", "OozieSampleShellWF");
- }
+ message.setStringProperty("appName", "FALCON_PROCESS_DEFAULT_process1");
message.setStringProperty("user", "falcon");
switch(i % 5) {
case 0:
@@ -245,15 +231,10 @@ public class JMSMessageConsumerTest {
sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING);
final BrokerView adminView = broker.getAdminView();
-
- Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
-// Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
sendMessages(SECONDARY_TOPIC_NAME, WorkflowExecutionContext.Type.POST_PROCESSING);
-// Assert.assertEquals(adminView.getTotalEnqueueCount(), 18);
- Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
Assert.assertEquals(adminView.getTotalConsumerCount(), 3);
} catch (Exception e) {
Assert.fail("This should not have thrown an exception.", e);
@@ -265,9 +246,6 @@ public class JMSMessageConsumerTest {
sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB);
final BrokerView adminView = broker.getAdminView();
-
- Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
-// Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
// Async operations. Give some time for messages to be processed.
@@ -283,9 +261,6 @@ public class JMSMessageConsumerTest {
sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.COORDINATOR_ACTION);
final BrokerView adminView = broker.getAdminView();
-
- Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
-// Assert.assertEquals(adminView.getTotalEnqueueCount(), 12);
Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
// Async operations. Give some time for messages to be processed.
@@ -303,24 +278,4 @@ public class JMSMessageConsumerTest {
broker.stop();
subscriber.closeSubscriber();
}
-
- @Test
- public void testJMSMessagesFromOozieForNonFalconWF() throws Exception {
- sendMessages(TOPIC_NAME, WorkflowExecutionContext.Type.WORKFLOW_JOB, false /* isFalconWF */);
-
- final BrokerView adminView = broker.getAdminView();
-
- Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
- Assert.assertEquals(adminView.getTotalEnqueueCount(), 10);
- Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
- Assert.assertEquals(adminView.getTotalMessageCount(), 0);
-
- Thread.sleep(100);
- Mockito.verify(jobEndService, Mockito.never()).notifyStart(Mockito.any(WorkflowExecutionContext.class));
- Mockito.verify(jobEndService, Mockito.never()).notifySuccess(Mockito.any(WorkflowExecutionContext.class));
- Mockito.verify(jobEndService, Mockito.never()).notifySuspend(Mockito.any(WorkflowExecutionContext.class));
- Mockito.verify(jobEndService, Mockito.never()).notifyWait(Mockito.any(WorkflowExecutionContext.class));
- Mockito.verify(jobEndService, Mockito.never()).notifyFailure(Mockito.any(WorkflowExecutionContext.class));
- }
-
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index 48cf80d..6266d0d 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -25,7 +25,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-metrics</artifactId>
<description>Apache Falcon Metrics</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie-el-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/pom.xml b/oozie-el-extensions/pom.xml
index 71ae45d..afbb2e3 100644
--- a/oozie-el-extensions/pom.xml
+++ b/oozie-el-extensions/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-oozie-el-extension</artifactId>
<description>Apache Falcon Oozie EL Extension</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
index a6ff487..f0cb7cd 100644
--- a/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
+++ b/oozie-el-extensions/src/main/java/org/apache/oozie/extensions/OozieELExtensions.java
@@ -100,7 +100,7 @@ public final class OozieELExtensions {
String emptyDir = (String) eval.getVariable(dataInName + ".empty-dir");
XLog.getLog(OozieELExtensions.class).debug("No instances could be resolved. Passing empty dir : "
+ emptyDir);
- uristr = emptyDir;
+ return emptyDir;
}
} catch (Exception e) {
throw new RuntimeException("Failed to resolve instance range for " + dataInName, e);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
index b9bf594..2be8603 100644
--- a/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
+++ b/oozie-el-extensions/src/test/java/org/apache/oozie/extensions/TestOozieELExtensions.java
@@ -168,6 +168,8 @@ public class TestOozieELExtensions {
"*/US", "_DONE", },
// With availability flag. All instances missing
{"hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE", "null", "_FINISH"},
+ // With availability flag and partitions. All instances missing
+ {"hdfs://localhost:8020/projects/falcon/staging/EMPTY_DIR_DONT_DELETE", "*", "_FINISH"},
// No availability flag. One instance missing
{"hdfs://localhost:8020/clicks/2009/09/02/09", "null", ""},
// With availability flag. One instance missing.
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index c83daf6..a784c5a 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.falcon</groupId>
<artifactId>falcon-main</artifactId>
- <version>0.10-SNAPSHOT</version>
+ <version>0.10</version>
</parent>
<artifactId>falcon-oozie-adaptor</artifactId>
<description>Apache Falcon Oozie Adaptor Module</description>
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index b0e46f0..07d293c 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -275,11 +275,6 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
Path scriptPath = new Path(buildPath, "scripts");
copyHiveScript(fs, scriptPath, IMPORT_HQL);
copyHiveScript(fs, scriptPath, EXPORT_HQL);
-
- // create hive conf to stagingDir
- Path confPath = new Path(buildPath + "/conf");
- persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-");
- persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-");
} catch (IOException e) {
throw new FalconException("Unable to create hive conf files", e);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index 5a62130..010446b 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -24,7 +24,6 @@ import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.HiveUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -79,11 +78,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
marshal(cluster, workflow, buildPath);
Properties props = getProperties(buildPath, wfName);
props.putAll(createDefaultConfiguration(cluster));
- if (EntityUtil.isTableStorageType(cluster, entity)) {
- // todo: kludge send source hcat creds for coord dependency check to pass
- props.putAll(HiveUtil.getHiveCredentials(srcCluster));
- props.putAll(HiveUtil.getHiveCredentials(cluster));
- }
+
props.putAll(getWorkflowProperties(entity));
props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
// Write out the config to config-default.xml
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
index 629485d..3da97d3 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/HCatReplicationWorkflowBuilder.java
@@ -22,15 +22,18 @@ import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.HiveUtil;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import javax.xml.bind.JAXBElement;
import java.util.Arrays;
+import java.util.Map;
import java.util.Properties;
/**
@@ -60,6 +63,15 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
//Add pre-processing
if (shouldPreProcess()) {
ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
+ Properties hiveConf = HiveUtil.getHiveCredentials(src);
+ for (Map.Entry<Object, Object> e : hiveConf.entrySet()) {
+ CONFIGURATION.Property prop = new CONFIGURATION.Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ LOG.info("Adding config to replication hive preprocessing action : key = {} value = {}",
+ e.getKey(), e.getValue());
+ action.getJava().getConfiguration().getProperty().add(prop);
+ }
addHDFSServersConfig(action, src, target);
addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);
@@ -72,6 +84,16 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
OozieUtils.unMarshalHiveAction(export);
org.apache.falcon.oozie.hive.ACTION hiveExportAction = exportActionJaxbElement.getValue();
addHDFSServersConfig(hiveExportAction, src, target);
+ Properties hiveConf = HiveUtil.getHiveCredentials(src);
+ for (Map.Entry<Object, Object> e : hiveConf.entrySet()) {
+ org.apache.falcon.oozie.hive.CONFIGURATION.Property prop =
+ new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ LOG.info("Adding config to replication hive export action : key = {} value = {}",
+ e.getKey(), e.getValue());
+ hiveExportAction.getConfiguration().getProperty().add(prop);
+ }
OozieUtils.marshalHiveAction(export, exportActionJaxbElement);
addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(export);
@@ -89,6 +111,16 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
OozieUtils.unMarshalHiveAction(importAction);
org.apache.falcon.oozie.hive.ACTION hiveImportAction = importActionJaxbElement.getValue();
addHDFSServersConfig(hiveImportAction, src, target);
+ Properties hiveConf2 = HiveUtil.getHiveCredentials(target);
+ for (Map.Entry<Object, Object> e : hiveConf2.entrySet()) {
+ org.apache.falcon.oozie.hive.CONFIGURATION.Property prop =
+ new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
+ prop.setName((String) e.getKey());
+ prop.setValue((String) e.getValue());
+ LOG.info("Adding config to replication hive import action : key = {} value = {}",
+ e.getKey(), e.getValue());
+ hiveImportAction.getConfiguration().getProperty().add(prop);
+ }
OozieUtils.marshalHiveAction(importAction, importActionJaxbElement);
addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(importAction);
@@ -133,8 +165,8 @@ public class HCatReplicationWorkflowBuilder extends FeedReplicationWorkflowBuild
(org.apache.falcon.oozie.workflow.ACTION) object;
String actionName = action.getName();
if (PREPROCESS_ACTION_NAME.equals(actionName)) {
+
// add reference to hive-site conf to each action
- action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");
if (isSecurityEnabled) { // add a reference to credential in the action
action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
index dc5a491..51db75d 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/SparkProcessWorkflowBuilder.java
@@ -30,12 +30,10 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.spark.CONFIGURATION.Property;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.util.OozieUtils;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import javax.xml.bind.JAXBElement;
@@ -46,6 +44,7 @@ import java.util.List;
*/
public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder {
private static final String ACTION_TEMPLATE = "/action/process/spark-action.xml";
+ private static final String FALCON_PREFIX = "falcon_";
public SparkProcessWorkflowBuilder(Process entity) {
super(entity);
@@ -58,7 +57,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
String sparkMasterURL = entity.getSparkAttributes().getMaster();
- String sparkFilePath = entity.getSparkAttributes().getJar();
+ Path sparkJarFilePath = new Path(entity.getSparkAttributes().getJar());
String sparkJobName = entity.getSparkAttributes().getName();
String sparkOpts = entity.getSparkAttributes().getSparkOpts();
String sparkClassName = entity.getSparkAttributes().getClazz();
@@ -89,21 +88,32 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
argList.addAll(sparkArgs);
}
- addInputFeedsAsArgument(argList, cluster);
+ //Adding output first so that final order must have input and then output followed by user's arguments.
addOutputFeedsAsArgument(argList, cluster);
+ addInputFeedsAsArgument(argList, cluster);
- sparkAction.setJar(addUri(sparkFilePath, cluster));
-
- setSparkLibFileToWorkflowLib(sparkFilePath, entity);
+ // In Oozie spark action, value for jar is either Java jar file path or Python file path.
+ validateSparkJarFilePath(sparkJarFilePath);
+ sparkAction.setJar(sparkJarFilePath.getName());
+ setSparkLibFileToWorkflowLib(sparkJarFilePath.toString(), entity);
propagateEntityProperties(sparkAction);
OozieUtils.marshalSparkAction(action, actionJaxbElement);
return action;
}
- private void setSparkLibFileToWorkflowLib(String sparkFile, Process entity) {
+ private void setSparkLibFileToWorkflowLib(String sparkJarFilePath, Process entity) {
if (StringUtils.isEmpty(entity.getWorkflow().getLib())) {
- entity.getWorkflow().setLib(sparkFile);
+ entity.getWorkflow().setLib(sparkJarFilePath);
+ } else {
+ String workflowLib = entity.getWorkflow().getLib() + "," + sparkJarFilePath;
+ entity.getWorkflow().setLib(workflowLib);
+ }
+ }
+
+ private void validateSparkJarFilePath(Path sparkJarFilePath) throws FalconException {
+ if (!sparkJarFilePath.isAbsolute()) {
+ throw new FalconException("Spark jar file path must be absolute:"+sparkJarFilePath);
}
}
@@ -145,6 +155,7 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
return;
}
+ //Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect.
int numInputFeed = entity.getInputs().getInputs().size();
while (numInputFeed > 0) {
Input input = entity.getInputs().getInputs().get(numInputFeed-1);
@@ -153,6 +164,10 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
final String inputName = input.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
argList.add(0, "${" + inputName + "}");
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ argList.add(0, "${" + FALCON_PREFIX+inputName+"_database" + "}");
+ argList.add(0, "${" + FALCON_PREFIX+inputName+"_table" + "}");
+ argList.add(0, "${" + FALCON_PREFIX+inputName+"_partition_filter_hive" + "}");
}
numInputFeed--;
}
@@ -163,26 +178,24 @@ public class SparkProcessWorkflowBuilder extends ProcessExecutionWorkflowBuilder
return;
}
- for(Output output : entity.getOutputs().getOutputs()) {
+ //Adding to the 0th index and getting the args shifted as arguments are added to get the desired effect.
+ int numOutputFeed = entity.getOutputs().getOutputs().size();
+ while (numOutputFeed > 0) {
+ Output output = entity.getOutputs().getOutputs().get(numOutputFeed-1);
Feed feed = EntityUtil.getEntity(EntityType.FEED, output.getFeed());
Storage storage = FeedHelper.createStorage(cluster, feed);
final String outputName = output.getName();
if (storage.getType() == Storage.TYPE.FILESYSTEM) {
- argList.add(argList.size(), "${" + outputName + "}");
+ argList.add(0, "${" + outputName + "}");
+ } else if (storage.getType() == Storage.TYPE.TABLE) {
+ argList.add(0, "${" + FALCON_PREFIX+outputName+"_database" + "}");
+ argList.add(0, "${" + FALCON_PREFIX+outputName+"_table" + "}");
+ argList.add(0, "${" + FALCON_PREFIX+outputName+"_partitions_hive" + "}");
}
+ numOutputFeed--;
}
}
- private String addUri(String jarFile, Cluster cluster) throws FalconException {
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
- ClusterHelper.getConfiguration(cluster));
- Path jarFilePath = new Path(jarFile);
- if (jarFilePath.isAbsoluteAndSchemeAuthorityNull()) {
- return fs.makeQualified(jarFilePath).toString();
- }
- return jarFile;
- }
-
private String getClusterEntitySparkMaster(Cluster cluster) {
return ClusterHelper.getSparkMasterEndPoint(cluster);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/01a303e3/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4961896..ea914f6 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -48,6 +48,14 @@ public class FalconPostProcessing extends Configured implements Tool {
// serialize the context to HDFS under logs dir before sending the message
context.serialize();
+ boolean systemNotificationEnabled = Boolean.parseBoolean(context.
+ getValue(WorkflowExecutionArgs.SYSTEM_JMS_NOTIFICATION_ENABLED, "true"));
+
+ if (systemNotificationEnabled) {
+ LOG.info("Sending Falcon message {} ", context);
+ invokeFalconMessageProducer(context);
+ }
+
String userBrokerUrl = context.getValue(WorkflowExecutionArgs.USER_BRKR_URL);
boolean userNotificationEnabled = Boolean.parseBoolean(context.
getValue(WorkflowExecutionArgs.USER_JMS_NOTIFICATION_ENABLED, "true"));
@@ -72,6 +80,13 @@ public class FalconPostProcessing extends Configured implements Tool {
jmsMessageProducer.sendMessage(WorkflowExecutionContext.USER_MESSAGE_ARGS);
}
+ private void invokeFalconMessageProducer(WorkflowExecutionContext context) throws Exception {
+ JMSMessageProducer jmsMessageProducer = JMSMessageProducer.builder(context)
+ .type(JMSMessageProducer.MessageType.FALCON)
+ .build();
+ jmsMessageProducer.sendMessage();
+ }
+
private void invokeLogProducer(WorkflowExecutionContext context) {
// todo: need to move this out to Falcon in-process
if (UserGroupInformation.isSecurityEnabled()) {