You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/01/22 11:23:32 UTC
[1/3] FALCON-123 Improve build speeds in falcon. Contributed by
Srikanth Sundarrajan
Updated Branches:
refs/heads/master d555dd501 -> d1642beab
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
new file mode 100644
index 0000000..b96aa48
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseySmokeIT.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.resource;
+
+import com.sun.jersey.api.client.ClientResponse;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.oozie.client.BundleJob;
+import org.apache.oozie.client.Job.Status;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test class for Entity REST APIs.
+ *
+ * Tests should be enabled only in local environments as they need running instance of the web server.
+ */
+@Test
+public class EntityManagerJerseySmokeIT {
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ TestContext.prepare();
+ }
+
+ private ThreadLocal<TestContext> contexts = new ThreadLocal<TestContext>();
+
+ private TestContext newContext() {
+ contexts.set(new TestContext());
+ return contexts.get();
+ }
+
+ @AfterMethod
+ public void cleanup() throws Exception {
+ TestContext testContext = contexts.get();
+ if (testContext != null) {
+ testContext.killOozieJobs();
+ }
+ contexts.remove();
+ }
+
+ public void testProcessDeleteAndSchedule() throws Exception {
+ //Submit process with invalid property so that coord submit fails and bundle goes to failed state
+ TestContext context = newContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+ String tmpFileName = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
+ Property prop = new Property();
+ prop.setName("newProp");
+ prop.setValue("${formatTim()}");
+ process.getProperties().getProperties().add(prop);
+ File tmpFile = context.getTempFile();
+ EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+ context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
+ context.waitForBundleStart(Status.FAILED);
+
+ //Delete and re-submit the process with correct workflow
+ ClientResponse clientRepsonse = context.service.path("api/entities/delete/process/"
+ + context.processName).header(
+ "Remote-User", TestContext.REMOTE_USER)
+ .accept(MediaType.TEXT_XML).delete(ClientResponse.class);
+ context.assertSuccessful(clientRepsonse);
+ process.getWorkflow().setPath("/falcon/test/workflow");
+ tmpFile = context.getTempFile();
+ EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+ clientRepsonse = context.service.path("api/entities/submitAndSchedule/process").
+ header("Remote-User", TestContext.REMOTE_USER)
+ .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+ .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+ context.assertSuccessful(clientRepsonse);
+
+ //Assert that new schedule creates new bundle
+ List<BundleJob> bundles = context.getBundles();
+ Assert.assertEquals(bundles.size(), 2);
+ }
+
+ public void testFeedSchedule() throws Exception {
+ TestContext context = newContext();
+ ClientResponse response;
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ response = context.submitToFalcon(context.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
+ context.assertSuccessful(response);
+
+ response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+ context.assertSuccessful(response);
+
+ EntityManagerJerseyIT.createTestData(context);
+ ClientResponse clientRepsonse = context.service
+ .path("api/entities/schedule/feed/" + overlay.get("inputFeedName"))
+ .header("Remote-User", TestContext.REMOTE_USER)
+ .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
+ .post(ClientResponse.class);
+ context.assertSuccessful(clientRepsonse);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
index ac15391..c0785ba 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
@@ -37,7 +37,7 @@ import javax.ws.rs.core.MediaType;
/**
* Test class for Process Instance REST API.
*/
-@Test(enabled = false)
+@Test(enabled = false, groups = {"exhaustive"})
public class ProcessInstanceManagerIT {
private static final String START_INSTANCE = "2012-04-20T00:00Z";
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
index 3fcd5dc..9e10956 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/TestContext.java
@@ -27,7 +27,6 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.cli.FalconCLI;
import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.cluster.util.StandAloneCluster;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
@@ -39,6 +38,7 @@ import org.apache.falcon.workflow.engine.OozieClientFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.log4j.Logger;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
@@ -66,10 +66,13 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.StringReader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -77,6 +80,8 @@ import java.util.regex.Pattern;
* Base test class for CLI, Entity and Process Instances.
*/
public class TestContext {
+ private static final Logger LOG = Logger.getLogger(TestContext.class);
+
public static final String FEED_TEMPLATE1 = "/feed-template1.xml";
public static final String FEED_TEMPLATE2 = "/feed-template2.xml";
@@ -170,7 +175,7 @@ public class TestContext {
}
System.out.println("Waiting for workflow to start");
- Thread.sleep(i * 1000);
+ Thread.sleep(i * 500);
}
throw new Exception("Workflow for " + entityName + " hasn't started in oozie");
}
@@ -183,25 +188,26 @@ public class TestContext {
waitForWorkflowStart(outputFeedName);
}
- public void waitForBundleStart(Status status) throws Exception {
+ public void waitForBundleStart(Status... status) throws Exception {
OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
List<BundleJob> bundles = getBundles();
if (bundles.isEmpty()) {
return;
}
+ Set<Status> statuses = new HashSet<Status>(Arrays.asList(status));
String bundleId = bundles.get(0).getId();
for (int i = 0; i < 15; i++) {
Thread.sleep(i * 1000);
BundleJob bundle = ozClient.getBundleJobInfo(bundleId);
- if (bundle.getStatus() == status) {
- if (status == Status.FAILED) {
+ if (statuses.contains(bundle.getStatus())) {
+ if (statuses.contains(Status.FAILED) || statuses.contains(Status.KILLED)) {
return;
}
boolean done = false;
for (CoordinatorJob coord : bundle.getCoordinators()) {
- if (coord.getStatus() == status) {
+ if (statuses.contains(coord.getStatus())) {
done = true;
}
}
@@ -220,14 +226,13 @@ public class TestContext {
InstancesResult.class);
unmarshaller = jaxbContext.createUnmarshaller();
marshaller = jaxbContext.createMarshaller();
-
+ configure();
} catch (Exception e) {
throw new RuntimeException(e);
}
- configure();
}
- public void configure() {
+ public void configure() throws Exception {
StartupProperties.get().setProperty(
"application.services",
StartupProperties.get().getProperty("application.services")
@@ -236,12 +241,14 @@ public class TestContext {
StartupProperties.get().setProperty("config.store.uri", store + System.currentTimeMillis());
ClientConfig config = new DefaultClientConfig();
Client client = Client.create(config);
+ client.setReadTimeout(500000);
+ client.setConnectTimeout(500000);
this.service = client.resource(UriBuilder.fromUri(BASE_URL).build());
}
- public void setCluster(String file) throws Exception {
- cluster = StandAloneCluster.newCluster(file);
- clusterName = cluster.getCluster().getName();
+ public void setCluster(String cName) throws Exception {
+ cluster = EmbeddedCluster.newCluster(cName, true);
+ this.clusterName = cluster.getCluster().getName();
}
/**
@@ -280,7 +287,7 @@ public class TestContext {
String tmpFile = overlayParametersOverTemplate(template, overlay);
if (entityType == EntityType.CLUSTER) {
try {
- cluster = StandAloneCluster.newCluster(tmpFile);
+ cluster = EmbeddedCluster.newCluster(overlay.get("cluster"), true);
clusterName = cluster.getCluster().getName();
} catch (Exception e) {
throw new IOException("Unable to setup cluster info", e);
@@ -434,8 +441,8 @@ public class TestContext {
overlay.put("cluster", RandomStringUtils.randomAlphabetic(5));
overlay.put("colo", "gs");
TestContext context = new TestContext();
- String file = context.overlayParametersOverTemplate(clusterTemplate, overlay);
- EmbeddedCluster cluster = StandAloneCluster.newCluster(file);
+ context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ EmbeddedCluster cluster = EmbeddedCluster.newCluster(overlay.get("cluster"), true);
cleanupStore();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/resources/cluster-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/cluster-template.xml b/webapp/src/test/resources/cluster-template.xml
index 32eb643..bc17fb9 100644
--- a/webapp/src/test/resources/cluster-template.xml
+++ b/webapp/src/test/resources/cluster-template.xml
@@ -19,9 +19,9 @@
<cluster colo="##colo##" description="" name="##cluster##" xmlns="uri:falcon:cluster:0.1">
<interfaces>
- <interface type="readonly" endpoint="hftp://localhost:41110"
+ <interface type="readonly" endpoint="jail://global:00"
version="0.20.2"/>
- <interface type="write" endpoint="hdfs://localhost:41020"
+ <interface type="write" endpoint="jail://global:00"
version="0.20.2"/>
<interface type="execute" endpoint="localhost:41021" version="0.20.2"/>
<interface type="workflow" endpoint="http://localhost:41000/oozie/"
[2/3] FALCON-123 Improve build speeds in falcon. Contributed by
Srikanth Sundarrajan
Posted by sh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 36c5f7a..4e8db45 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,8 +111,9 @@
<hcatalog.version>0.11.0</hcatalog.version>
<hadoop-distcp.version>0.9</hadoop-distcp.version>
<jetty.version>6.1.26</jetty.version>
+ <jersey.version>1.9</jersey.version>
<internal.maven.repo>file:///tmp/falcontemprepo</internal.maven.repo>
- <skipCheck>false</skipCheck>
+ <excluded.test.groups>exhaustive</excluded.test.groups>
</properties>
<profiles>
@@ -134,6 +135,18 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
<exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
@@ -175,6 +188,18 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop1.version}</version>
<exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.servlet</artifactId>
@@ -197,6 +222,18 @@
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ </exclusion>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.servlet</artifactId>
@@ -217,6 +254,12 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<classifier>tests</classifier>
@@ -247,7 +290,7 @@
<version>${hadoop.version}</version>
</dependency>
</dependencies>
- </dependencyManagement>
+ </dependencyManagement>
</profile>
<profile>
@@ -266,6 +309,110 @@
</plugins>
</build>
</profile>
+
+ <profile>
+ <id>test-patch</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <useDefaultExcludes>true</useDefaultExcludes>
+ <useMavenDefaultExcludes>true</useMavenDefaultExcludes>
+ <useIdeaDefaultExcludes>true</useIdeaDefaultExcludes>
+ <useEclipseDefaultExcludes>true</useEclipseDefaultExcludes>
+ <excludeSubProjects>true</excludeSubProjects>
+ <excludes>
+ <exclude>*.txt</exclude>
+ <exclude>.git/**</exclude>
+ <exclude>**/.idea/**</exclude>
+ <exclude>**/*.twiki</exclude>
+ <exclude>**/*.iml</exclude>
+ <exclude>**/target/**</exclude>
+ <exclude>**/activemq-data/**</exclude>
+ <exclude>**/build/**</exclude>
+ <exclude>**/*.patch</exclude>
+ <exclude>derby.log</exclude>
+ <exclude>**/logs/**</exclude>
+ <exclude>**/.classpath</exclude>
+ <exclude>**/.project</exclude>
+ <exclude>**/.settings/**</exclude>
+ <exclude>**/test-output/**</exclude>
+ <exclude>**/data.txt</exclude>
+ <exclude>**/maven-eclipse.xml</exclude>
+ <exclude>**/.externalToolBuilders/**</exclude>
+ <exclude>html5-ui/**</exclude>
+ </excludes>
+ </configuration>
+ <executions>
+ <execution>
+ <id>rat-check</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>verify</phase>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <executions>
+ <execution>
+ <id>checkstyle-check</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>verify</phase>
+ <configuration>
+ <consoleOutput>true</consoleOutput>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <configLocation>falcon/checkstyle.xml</configLocation>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <!--debug>true</debug -->
+ <xmlOutput>true</xmlOutput>
+ <excludeFilterFile>${basedir}/../checkstyle/src/main/resources/falcon/findbugs-exclude.xml</excludeFilterFile>
+ <failOnError>true</failOnError>
+ </configuration>
+ <executions>
+ <execution>
+ <id>findbugs-check</id>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>verify</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <!-- Source code metrics: mvn javancss:report or mvn site -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>javancss-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ <properties>
+ <excluded.test.groups/>
+ </properties>
+ </profile>
</profiles>
<modules>
@@ -443,13 +590,13 @@
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
- <version>1.8</version>
+ <version>${jersey.version}</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-json</artifactId>
- <version>1.8</version>
+ <version>${jersey.version}</version>
</dependency>
<dependency>
@@ -466,6 +613,12 @@
<dependency>
<groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-hadoop-dependencies</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.falcon</groupId>
<artifactId>falcon-metrics</artifactId>
<version>${project.version}</version>
</dependency>
@@ -511,7 +664,7 @@
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
- <version>1.8</version>
+ <version>${jersey.version}</version>
</dependency>
<dependency>
@@ -663,6 +816,12 @@
</dependency>
<dependency>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ <version>2.0</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
@@ -845,12 +1004,12 @@
</includes>
</resource>
<resource>
- <directory>..</directory>
- <targetPath>META-INF</targetPath>
- <includes>
- <include>LICENSE.txt</include>
- <include>NOTICE.txt</include>
- </includes>
+ <directory>..</directory>
+ <targetPath>META-INF</targetPath>
+ <includes>
+ <include>LICENSE.txt</include>
+ <include>NOTICE.txt</include>
+ </includes>
</resource>
</resources>
<testResources>
@@ -868,12 +1027,6 @@
</plugin>
<plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.5</version>
- </plugin>
-
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
@@ -891,10 +1044,16 @@
<version>2.8.1</version>
</plugin>
- <plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.14</version>
+ <version>2.16</version>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.16</version>
</plugin>
<plugin>
@@ -985,11 +1144,6 @@
</plugin>
<plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- </plugin>
-
- <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
@@ -1004,7 +1158,7 @@
<executions>
<execution>
<id>attach-sources</id>
- <phase>package</phase>
+ <phase>site</phase>
<goals>
<goal>jar-no-fork</goal>
</goals>
@@ -1018,19 +1172,13 @@
<executions>
<execution>
<id>attach-javadocs</id>
- <phase>package</phase>
+ <phase>site</phase>
<goals>
<goal>javadoc</goal>
<goal>jar</goal>
</goals>
- <configuration>
- <skip>${skipCheck}</skip>
- </configuration>
- </execution>
+ </execution>
</executions>
- <configuration>
- <skip>${skipCheck}</skip>
- </configuration>
</plugin>
<plugin>
@@ -1047,122 +1195,41 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
+ <version>2.16</version>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
<forkMode>always</forkMode>
<argLine>-Djava.awt.headless=true -Djava.security.krb5.realm= -Djava.security.krb5.kdc=</argLine>
+ <excludedGroups>${excluded.test.groups}</excludedGroups>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <executions>
- <execution>
- <id>deploy</id>
- <phase>deploy</phase>
- <goals>
- <goal>deploy</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.rat</groupId>
- <artifactId>apache-rat-plugin</artifactId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.16</version>
<configuration>
- <useDefaultExcludes>true</useDefaultExcludes>
- <useMavenDefaultExcludes>true</useMavenDefaultExcludes>
- <useIdeaDefaultExcludes>true</useIdeaDefaultExcludes>
- <useEclipseDefaultExcludes>true</useEclipseDefaultExcludes>
- <excludeSubProjects>true</excludeSubProjects>
- <excludes>
- <exclude>*.txt</exclude>
- <exclude>.git/**</exclude>
- <exclude>.idea/**</exclude>
- <exclude>**/*.twiki</exclude>
- <exclude>**/*.iml</exclude>
- <exclude>**/target/**</exclude>
- <exclude>**/activemq-data/**</exclude>
- <exclude>**/build/**</exclude>
- <exclude>**/*.patch</exclude>
- <exclude>derby.log</exclude>
- <exclude>**/logs/**</exclude>
- <exclude>**/.classpath</exclude>
- <exclude>**/.project</exclude>
- <exclude>**/.settings/**</exclude>
- <exclude>**/test-output/**</exclude>
- <exclude>**/data.txt</exclude>
- <exclude>**/maven-eclipse.xml</exclude>
- <exclude>**/.externalToolBuilders/**</exclude>
- <exclude>html5-ui/**</exclude>
- </excludes>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <forkMode>always</forkMode>
+ <argLine>-Djava.security.krb5.realm= -Djava.security.krb5.kdc=
+ -Dhadoop.tmp.dir=${project.build.directory}/tmp-hadoop-${user.name}</argLine>
+ <excludedGroups>${excluded.test.groups}</excludedGroups>
</configuration>
<executions>
<execution>
- <id>rat-check</id>
- <goals>
- <goal>check</goal>
- </goals>
- <phase>verify</phase>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.falcon</groupId>
- <artifactId>checkstyle</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
- <executions>
- <execution>
- <id>checkstyle-check</id>
+ <id>integration-test</id>
<goals>
- <goal>check</goal>
+ <goal>integration-test</goal>
</goals>
- <phase>verify</phase>
- <configuration>
- <consoleOutput>true</consoleOutput>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- <configLocation>falcon/checkstyle.xml</configLocation>
- <failOnViolation>true</failOnViolation>
- <skip>${skipCheck}</skip>
- </configuration>
</execution>
- </executions>
- </plugin>
-
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <configuration>
- <!--debug>true</debug -->
- <xmlOutput>true</xmlOutput>
- <excludeFilterFile>${basedir}/../checkstyle/src/main/resources/falcon/findbugs-exclude.xml</excludeFilterFile>
- <failOnError>true</failOnError>
- <skip>${skipCheck}</skip>
- </configuration>
- <executions>
<execution>
- <id>findbugs-check</id>
+ <id>verify</id>
<goals>
- <goal>check</goal>
+ <goal>verify</goal>
</goals>
- <phase>verify</phase>
</execution>
</executions>
</plugin>
- <!-- Source code metrics: mvn javancss:report or mvn site -->
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>javancss-maven-plugin</artifactId>
- </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
----------------------------------------------------------------------
diff --git a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
index 794e585..61ddbdc 100644
--- a/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
+++ b/process/src/test/java/org/apache/falcon/converter/OozieProcessMapperTest.java
@@ -95,12 +95,12 @@ public class OozieProcessMapperTest extends AbstractTestBase {
Cluster cluster = store.get(EntityType.CLUSTER, "corp");
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
- fs = new Path(hdfsUrl).getFileSystem(new Configuration());
+ fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/PROCESS/ext.jar")).close();
Process process = store.get(EntityType.PROCESS, "clicksummary");
Path wfpath = new Path(process.getWorkflow().getPath());
- assert new Path(hdfsUrl).getFileSystem(new Configuration()).mkdirs(wfpath);
+ assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
}
public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/test-util/pom.xml
----------------------------------------------------------------------
diff --git a/test-util/pom.xml b/test-util/pom.xml
index 6bd4129..4fe72f6 100644
--- a/test-util/pom.xml
+++ b/test-util/pom.xml
@@ -90,6 +90,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-hadoop-dependencies</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
----------------------------------------------------------------------
diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
index c443e05..2b55407 100644
--- a/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
+++ b/test-util/src/main/java/org/apache/falcon/cluster/util/EmbeddedCluster.java
@@ -18,23 +18,21 @@
package org.apache.falcon.cluster.util;
-import java.io.File;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.Interface;
import org.apache.falcon.entity.v0.cluster.Interfaces;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.cluster.Location;
import org.apache.falcon.entity.v0.cluster.Locations;
+import org.apache.falcon.hadoop.JailedFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
/**
* A utility class that doles out an embedded Hadoop cluster with DFS and/or MR.
*/
@@ -45,16 +43,26 @@ public class EmbeddedCluster {
protected EmbeddedCluster() {
}
- private Configuration conf = new Configuration();
- private MiniDFSCluster dfsCluster;
+ //private MiniDFSCluster dfsCluster;
+ protected Configuration conf = newConfiguration();
protected Cluster clusterEntity;
public Configuration getConf() {
return conf;
}
+ public static Configuration newConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.set("fs.jail.impl", JailedFileSystem.class.getName());
+ return configuration;
+ }
+
public static EmbeddedCluster newCluster(final String name) throws Exception {
- return createClusterAsUser(name);
+ return createClusterAsUser(name, false);
+ }
+
+ public static EmbeddedCluster newCluster(final String name, boolean global) throws Exception {
+ return createClusterAsUser(name, global);
}
public static EmbeddedCluster newCluster(final String name,
@@ -63,30 +71,16 @@ public class EmbeddedCluster {
return hdfsUser.doAs(new PrivilegedExceptionAction<EmbeddedCluster>() {
@Override
public EmbeddedCluster run() throws Exception {
- return createClusterAsUser(name);
+ return createClusterAsUser(name, false);
}
});
}
- private static EmbeddedCluster createClusterAsUser(String name) throws IOException {
+ private static EmbeddedCluster createClusterAsUser(String name, boolean global) throws IOException {
EmbeddedCluster cluster = new EmbeddedCluster();
- File target = new File("webapp/target");
- if (!target.exists()) {
- target = new File("target");
- System.setProperty("test.build.data", "target/" + name + "/data");
- } else {
- System.setProperty("test.build.data", "webapp/target/" + name + "/data");
- }
- cluster.conf.set("hadoop.tmp.dir", target.getAbsolutePath());
- cluster.conf.set("hadoop.log.dir", new File(target, "tmp").getAbsolutePath());
- cluster.conf.set("hadoop.proxyuser.oozie.groups", "*");
- cluster.conf.set("hadoop.proxyuser.oozie.hosts", "127.0.0.1");
- cluster.conf.set("hadoop.proxyuser.hdfs.groups", "*");
- cluster.conf.set("hadoop.proxyuser.hdfs.hosts", "127.0.0.1");
- cluster.conf.set("mapreduce.jobtracker.kerberos.principal", "");
- cluster.conf.set("dfs.namenode.kerberos.principal", "");
- cluster.dfsCluster = new MiniDFSCluster(cluster.conf, 1, true, null);
- ProxyUsers.refreshSuperUserGroupsConfiguration(cluster.conf);
+ cluster.conf.set("jail.base", System.getProperty("hadoop.tmp.dir",
+ cluster.conf.get("hadoop.tmp.dir", "/tmp")));
+ cluster.conf.set("fs.default.name", "jail://" + (global ? "global" : name) + ":00");
String hdfsUrl = cluster.conf.get("fs.default.name");
LOG.info("Cluster Namenode = " + hdfsUrl);
cluster.buildClusterObject(name);
@@ -97,7 +91,7 @@ public class EmbeddedCluster {
return FileSystem.get(conf);
}
- private void buildClusterObject(String name) {
+ protected void buildClusterObject(String name) {
clusterEntity = new Cluster();
clusterEntity.setName(name);
clusterEntity.setColo("local");
@@ -105,17 +99,16 @@ public class EmbeddedCluster {
Interfaces interfaces = new Interfaces();
interfaces.getInterfaces().add(newInterface(Interfacetype.WORKFLOW,
- "http://localhost:11000/oozie", "0.1"));
+ "http://localhost:41000/oozie", "0.1"));
String fsUrl = conf.get("fs.default.name");
interfaces.getInterfaces().add(newInterface(Interfacetype.READONLY, fsUrl, "0.1"));
interfaces.getInterfaces().add(newInterface(Interfacetype.WRITE, fsUrl, "0.1"));
interfaces.getInterfaces().add(newInterface(Interfacetype.EXECUTE,
- conf.get("mapred.job.tracker"), "0.1"));
+ "localhost:41021", "0.1"));
interfaces.getInterfaces().add(
newInterface(Interfacetype.REGISTRY, "thrift://localhost:49083", "0.1"));
interfaces.getInterfaces().add(
newInterface(Interfacetype.MESSAGING, "vm://localhost", "0.1"));
-
clusterEntity.setInterfaces(interfaces);
Location location = new Location();
@@ -125,7 +118,7 @@ public class EmbeddedCluster {
locs.getLocations().add(location);
location = new Location();
location.setName("working");
- location.setPath("/projects/falcon/working");
+ location.setPath("/project/falcon/working");
locs.getLocations().add(location);
clusterEntity.setLocations(locs);
}
@@ -140,7 +133,7 @@ public class EmbeddedCluster {
}
public void shutdown() {
- dfsCluster.shutdown();
+ //dfsCluster.shutdown();
}
public Cluster getCluster() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/test-util/src/main/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/test-util/src/main/resources/core-site.xml b/test-util/src/main/resources/core-site.xml
new file mode 100644
index 0000000..da00644
--- /dev/null
+++ b/test-util/src/main/resources/core-site.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+ <property>
+ <name>fs.jail.impl</name>
+ <value>org.apache.falcon.hadoop.JailedFileSystem</value>
+ </property>
+
+ <property>
+ <name>mapreduce.framework.name</name>
+ <value>unittests</value>
+ </property>
+</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/pom.xml
----------------------------------------------------------------------
diff --git a/webapp/pom.xml b/webapp/pom.xml
index 0c2d844..8c37409 100644
--- a/webapp/pom.xml
+++ b/webapp/pom.xml
@@ -143,6 +143,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -218,19 +222,17 @@
<executions>
<execution>
<id>uber-javadocs</id>
- <phase>package</phase>
+ <phase>site</phase>
<goals>
<goal>javadoc</goal>
<goal>jar</goal>
</goals>
<configuration>
- <skip>${skipCheck}</skip>
<includeTransitiveDependencySources>false</includeTransitiveDependencySources>
<includeDependencySources>true</includeDependencySources>
<dependencySourceIncludes>
<dependencySourceInclude>org.apache.falcon:*</dependencySourceInclude>
</dependencySourceIncludes>
- <skip>${skipCheck}</skip>
</configuration>
</execution>
</executions>
@@ -317,6 +319,23 @@
<outputDirectory>${project.build.directory}/libext</outputDirectory>
<destFileName>kahadb.jar</destFileName>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-hadoop-dependencies</artifactId>
+ <version>${project.version}</version>
+ <overWrite>true</overWrite>
+ <outputDirectory>${project.build.directory}/falcon-webapp-${project.version}/WEB-INF/lib</outputDirectory>
+ <destFileName>falcon-hadoop-dependencies-${project.version}.jar</destFileName>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.pig</groupId>
+ <artifactId>pig</artifactId>
+ <version>0.11.1</version>
+ <type>jar</type>
+ <overWrite>false</overWrite>
+ <outputDirectory>${project.build.directory}/sharelib</outputDirectory>
+ <destFileName>pig.jar</destFileName>
+ </artifactItem>
</artifactItems>
</configuration>
</execution>
@@ -346,31 +365,6 @@
</plugin>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <version>2.15</version>
- <configuration>
- <redirectTestOutputToFile>true</redirectTestOutputToFile>
- <forkMode>always</forkMode>
- <argLine>-Djava.security.krb5.realm= -Djava.security.krb5.kdc=</argLine>
- </configuration>
- <executions>
- <execution>
- <id>integration-test</id>
- <goals>
- <goal>integration-test</goal>
- </goals>
- </execution>
- <execution>
- <id>verify</id>
- <goals>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
-
- <plugin>
<groupId>org.mortbay.jetty</groupId>
<artifactId>maven-jetty-plugin</artifactId>
<version>${jetty.version}</version>
@@ -434,7 +428,6 @@
<goal>run</goal>
</goals>
<configuration>
- <skip>${skipCheck}</skip>
<daemon>true</daemon>
</configuration>
</execution>
@@ -447,6 +440,7 @@
</execution>
</executions>
</plugin>
+
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/conf/oozie/conf/action-conf/hive.xml
----------------------------------------------------------------------
diff --git a/webapp/src/conf/oozie/conf/action-conf/hive.xml b/webapp/src/conf/oozie/conf/action-conf/hive.xml
index e5aef7d..e734089 100644
--- a/webapp/src/conf/oozie/conf/action-conf/hive.xml
+++ b/webapp/src/conf/oozie/conf/action-conf/hive.xml
@@ -30,7 +30,7 @@
<property>
<name>fs.default.name</name>
- <value>hdfs://localhost:41020</value>
+ <value>jail://global:00</value>
</property>
<!-- Forcing the creation of the db dir under target so mvn clean will clean up -->
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/conf/oozie/conf/hadoop-conf/core-site.xml
----------------------------------------------------------------------
diff --git a/webapp/src/conf/oozie/conf/hadoop-conf/core-site.xml b/webapp/src/conf/oozie/conf/hadoop-conf/core-site.xml
index 35078c7..bc8fa99 100644
--- a/webapp/src/conf/oozie/conf/hadoop-conf/core-site.xml
+++ b/webapp/src/conf/oozie/conf/hadoop-conf/core-site.xml
@@ -36,7 +36,7 @@
<property>
<name>mapreduce.framework.name</name>
- <value>yarn</value>
+ <value>unittests</value>
</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/conf/oozie/conf/oozie-site.xml
----------------------------------------------------------------------
diff --git a/webapp/src/conf/oozie/conf/oozie-site.xml b/webapp/src/conf/oozie/conf/oozie-site.xml
index 48408ba..e5f404a 100644
--- a/webapp/src/conf/oozie/conf/oozie-site.xml
+++ b/webapp/src/conf/oozie/conf/oozie-site.xml
@@ -473,6 +473,15 @@
</description>
</property>
+ <property>
+ <name>oozie.service.HadoopAccessorService.supported.filesystems</name>
+ <value>hdfs,hftp,webhdfs,jail</value>
+ <description>
+ Enlist the different filesystems supported for federation. If wildcard "*" is specified,
+ then ALL file schemes will be allowed.
+ </description>
+ </property>
+
<!-- Proxyuser Configuration -->
<property>
<name>oozie.service.ProxyUserService.proxyuser.${user.name}.hosts</name>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
index c4d6671..9909140 100644
--- a/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
+++ b/webapp/src/test/java/org/apache/falcon/catalog/HiveCatalogServiceIT.java
@@ -48,7 +48,7 @@ public class HiveCatalogServiceIT {
private static final String DATABASE_NAME = "falcon_db";
private static final String TABLE_NAME = "falcon_table";
private static final String EXTERNAL_TABLE_NAME = "falcon_external";
- private static final String EXTERNAL_TABLE_LOCATION = "hdfs://localhost:41020/falcon/staging/falcon_external";
+ private static final String EXTERNAL_TABLE_LOCATION = "jail://global:00/falcon/staging/falcon_external";
private HiveCatalogService hiveCatalogService;
private HCatClient client;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index 4730728..0767a76 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -35,6 +35,7 @@ import java.util.Map;
*
* todo: Refactor both the classes to move this methods to helper;
*/
+@Test(groups = {"exhaustive"})
public class FalconCLIIT {
private InMemoryWriter stream = new InMemoryWriter(System.out);
@@ -47,7 +48,6 @@ public class FalconCLIIT {
TestContext.prepare();
}
- @Test(enabled = TEST_ENABLED)
public void testSubmitEntityValidCommands() throws Exception {
FalconCLI.OUT.set(stream);
@@ -60,7 +60,7 @@ public class FalconCLIIT {
Assert.assertEquals(
0,
executeWithURL("entity -submit -type cluster -file " + filePath));
- context.setCluster(filePath);
+ context.setCluster(overlay.get("cluster"));
Assert.assertEquals(stream.buffer.toString().trim(),
"default/Submit successful (cluster) " + context.getClusterName());
@@ -90,17 +90,14 @@ public class FalconCLIIT {
+ overlay.get("processName"));
}
- @Test(enabled = TEST_ENABLED)
public void testListWithEmptyConfigStore() throws Exception {
Assert.assertEquals(
0,
executeWithURL("entity -list -type process "));
}
- @Test(enabled = TEST_ENABLED)
public void testSubmitAndScheduleEntityValidCommands() throws Exception {
- Thread.sleep(5000);
String filePath;
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -109,7 +106,7 @@ public class FalconCLIIT {
Assert.assertEquals(-1,
executeWithURL("entity -submitAndSchedule -type cluster -file "
+ filePath));
- context.setCluster(filePath);
+ context.setCluster(overlay.get("cluster"));
filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
@@ -131,11 +128,8 @@ public class FalconCLIIT {
Assert.assertEquals(0,
executeWithURL("entity -submitAndSchedule -type process -file "
+ filePath));
-
- Thread.sleep(5000);
}
- @Test(enabled = TEST_ENABLED)
public void testValidateValidCommands() throws Exception {
String filePath;
@@ -146,11 +140,11 @@ public class FalconCLIIT {
Assert.assertEquals(0,
executeWithURL("entity -validate -type cluster -file "
+ filePath));
- context.setCluster(filePath);
+ context.setCluster(overlay.get("cluster"));
Assert.assertEquals(
0,
executeWithURL("entity -submit -type cluster -file " + filePath));
- context.setCluster(filePath);
+ context.setCluster(overlay.get("cluster"));
filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
@@ -168,13 +162,12 @@ public class FalconCLIIT {
Assert.assertEquals(0,
executeWithURL("entity -validate -type process -file "
+ filePath));
+
Assert.assertEquals(
0,
executeWithURL("entity -submit -type process -file " + filePath));
-
}
- @Test(enabled = TEST_ENABLED)
public void testDefinitionEntityValidCommands() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -200,7 +193,6 @@ public class FalconCLIIT {
}
- @Test(enabled = TEST_ENABLED)
public void testScheduleEntityValidCommands() throws Exception {
TestContext context = new TestContext();
@@ -222,10 +214,8 @@ public class FalconCLIIT {
}
- @Test(enabled = TEST_ENABLED)
public void testSuspendResumeStatusEntityValidCommands() throws Exception {
- Thread.sleep(5000);
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
submitTestFiles(context, overlay);
@@ -291,15 +281,12 @@ public class FalconCLIIT {
executeWithURL("entity -status -type process -name "
+ overlay.get("processName")));
- Thread.sleep(5000);
}
- @Test(enabled = TEST_ENABLED)
public void testSubCommandPresence() throws Exception {
Assert.assertEquals(-1, executeWithURL("entity -type cluster "));
}
- @Test(enabled = TEST_ENABLED)
public void testDeleteEntityValidCommands() throws Exception {
TestContext context = new TestContext();
@@ -338,7 +325,6 @@ public class FalconCLIIT {
}
- @Test(enabled = TEST_ENABLED)
public void testInvalidCLIEntitycommands() throws Exception {
TestContext context = new TestContext();
@@ -351,7 +337,6 @@ public class FalconCLIIT {
executeWithURL("entity -schedule -type feed -file " + "name"));
}
- @Test(enabled = TEST_ENABLED)
public void testInstanceRunningAndStatusCommands() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -381,9 +366,7 @@ public class FalconCLIIT {
+ " -start " + START_INSTANCE));
}
- @Test(enabled = TEST_ENABLED)
public void testInstanceSuspendAndResume() throws Exception {
- Thread.sleep(5000);
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
submitTestFiles(context, overlay);
@@ -402,12 +385,10 @@ public class FalconCLIIT {
executeWithURL("instance -resume -type process -name "
+ overlay.get("processName")
+ " -start " + START_INSTANCE + " -end " + START_INSTANCE));
- Thread.sleep(5000);
}
private static final String START_INSTANCE = "2012-04-20T00:00Z";
- @Test(enabled = TEST_ENABLED)
public void testInstanceKillAndRerun() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -432,7 +413,6 @@ public class FalconCLIIT {
+ createTempJobPropertiesFile()));
}
- @Test(enabled = TEST_ENABLED)
public void testContinue() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -456,7 +436,6 @@ public class FalconCLIIT {
+ " -start " + START_INSTANCE));
}
- @Test(enabled = TEST_ENABLED)
public void testInvalidCLIInstanceCommands() throws Exception {
// no command
Assert.assertEquals(-1, executeWithURL(" -kill -type process -name "
@@ -475,7 +454,6 @@ public class FalconCLIIT {
}
- @Test(enabled = TEST_ENABLED)
public void testFalconURL() throws Exception {
Assert.assertEquals(-1, new FalconCLI()
.run(("instance -status -type process -name " + "processName"
@@ -491,7 +469,6 @@ public class FalconCLIIT {
}
- @Test(enabled = TEST_ENABLED)
public void testClientProperties() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -510,7 +487,6 @@ public class FalconCLIIT {
}
- @Test(enabled = TEST_ENABLED)
public void testGetVersion() throws Exception {
Assert.assertEquals(0,
new FalconCLI().run("admin -version".split("\\s")));
@@ -519,7 +495,6 @@ public class FalconCLIIT {
new FalconCLI().run("admin -stack".split("\\s")));
}
- @Test(enabled = TEST_ENABLED)
public void testInstanceGetLogs() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -536,7 +511,6 @@ public class FalconCLIIT {
}
-
private int executeWithURL(String command) throws Exception {
return new FalconCLI()
.run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
@@ -554,14 +528,14 @@ public class FalconCLIIT {
return tmpFile.getAbsolutePath();
}
- public void submitTestFiles(TestContext context, Map<String, String> overlay) throws Exception {
+ private void submitTestFiles(TestContext context, Map<String, String> overlay) throws Exception {
String filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(),
overlay);
Assert.assertEquals(
0,
executeWithURL("entity -submit -type cluster -file " + filePath));
- context.setCluster(filePath);
+ context.setCluster(overlay.get("cluster"));
filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Assert.assertEquals(0,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
new file mode 100644
index 0000000..55f240f
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.cli;
+
+import org.apache.falcon.resource.TestContext;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+/**
+ * Smoke Test for Falcon CLI.
+ */
+public class FalconCLISmokeIT {
+
+ private static final String START_INSTANCE = "2012-04-20T00:00Z";
+
+ @BeforeClass
+ public void prepare() throws Exception {
+ TestContext.prepare();
+ }
+
+ @Test
+ public void testSubmitAndScheduleEntityValidCommands() throws Exception {
+
+ String filePath;
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ filePath = context.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ Assert.assertEquals(-1,
+ executeWithURL("entity -submitAndSchedule -type cluster -file "
+ + filePath));
+ context.setCluster(overlay.get("cluster"));
+
+ filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submitAndSchedule -type feed -file "
+ + filePath));
+ filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submitAndSchedule -type feed -file "
+ + filePath));
+ filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submit -type feed -file " + filePath));
+
+ filePath = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submit -type feed -file " + filePath));
+
+ filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -validate -type process -file "
+ + filePath));
+
+ filePath = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submitAndSchedule -type process -file "
+ + filePath));
+
+ context.waitForProcessWFtoStart();
+
+ Assert.assertEquals(0,
+ executeWithURL("entity -definition -type cluster -name "
+ + overlay.get("cluster")));
+
+ Assert.assertEquals(0,
+ executeWithURL("instance -status -type feed -name "
+ + overlay.get("outputFeedName")
+ + " -start " + START_INSTANCE));
+
+ Assert.assertEquals(0,
+ executeWithURL("instance -running -type process -name "
+ + overlay.get("processName")));
+
+ }
+
+ private int executeWithURL(String command) throws Exception {
+ return new FalconCLI()
+ .run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
index 9b672f4..37226e2 100644
--- a/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
+++ b/webapp/src/test/java/org/apache/falcon/lifecycle/TableStorageFeedEvictorIT.java
@@ -71,7 +71,7 @@ public class TableStorageFeedEvictorIT {
private static final String DATABASE_NAME = "falcon_db";
private static final String TABLE_NAME = "clicks";
private static final String EXTERNAL_TABLE_NAME = "clicks_external";
- private static final String STORAGE_URL = "hdfs://localhost:41020";
+ private static final String STORAGE_URL = "jail://global:00";
private static final String EXTERNAL_TABLE_LOCATION = STORAGE_URL + "/falcon/staging/clicks_external/";
private final InMemoryWriter stream = new InMemoryWriter(System.out);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java b/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
deleted file mode 100644
index e3cd914..0000000
--- a/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.logging;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.cluster.util.StandAloneCluster;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.parser.ProcessEntityParser;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.resource.TestContext;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Test for LogMover.
- * Requires Oozie to be running on localhost.
- */
-@Test
-public class LogMoverIT {
-
- private static final ConfigurationStore STORE = ConfigurationStore.get();
- private static final String PROCESS_NAME = "testProcess" + System.currentTimeMillis();
- private static EmbeddedCluster testCluster = null;
- private static Process testProcess = null;
- private static FileSystem fs;
-
- @BeforeClass
- public void setup() throws Exception {
- Map<String, String> overlay = new HashMap<String, String>();
- overlay.put("cluster", "testCluster");
- TestContext context = new TestContext();
- String file = context.
- overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
- testCluster = StandAloneCluster.newCluster(file);
- STORE.publish(EntityType.CLUSTER, testCluster.getCluster());
-/*
- new File("target/libs").mkdirs();
- StartupProperties.get().setProperty("system.lib.location", "target/libs");
- SharedLibraryHostingService listener = new SharedLibraryHostingService();
- listener.onAdd(testCluster.getCluster());
-*/
- fs = FileSystem.get(testCluster.getConf());
- fs.mkdirs(new Path("/workflow/lib"));
-
- fs.copyFromLocalFile(
- new Path(LogMoverIT.class.getResource(
- "/org/apache/falcon/logging/workflow.xml").toURI()),
- new Path("/workflow"));
- fs.copyFromLocalFile(
- new Path(LogMoverIT.class.getResource(
- "/org/apache/falcon/logging/java-test.jar").toURI()),
- new Path("/workflow/lib"));
-
- testProcess = new ProcessEntityParser().parse(LogMoverIT.class
- .getResourceAsStream("/org/apache/falcon/logging/process.xml"));
- testProcess.setName(PROCESS_NAME);
- }
-
- @AfterClass
- public void tearDown() {
- testCluster.shutdown();
- }
-
- @Test (enabled = false)
- public void testLogMover() throws Exception {
- CurrentUser.authenticate(System.getProperty("user.name"));
- OozieWorkflowEngine engine = new OozieWorkflowEngine();
- String path = StartupProperties.get().getProperty("system.lib.location");
- if (!new File("target/libs").exists()) {
- Assert.assertTrue(new File("target/libs").mkdirs());
- }
- StartupProperties.get().setProperty("system.lib.location", "target/libs");
- engine.schedule(testProcess);
- StartupProperties.get().setProperty("system.lib.location", path);
-
- OozieClient client = new OozieClient(
- ClusterHelper.getOozieUrl(testCluster.getCluster()));
- List<WorkflowJob> jobs;
- while (true) {
- jobs = client.getJobsInfo(OozieClient.FILTER_NAME + "="
- + "FALCON_PROCESS_DEFAULT_" + PROCESS_NAME);
- if (jobs.size() > 0) {
- break;
- } else {
- Thread.sleep(1000);
- }
- }
-
- WorkflowJob job = jobs.get(0);
- while (true) {
- if (!(job.getStatus() == WorkflowJob.Status.RUNNING || job
- .getStatus() == WorkflowJob.Status.PREP)) {
- break;
- } else {
- Thread.sleep(1000);
- job = client.getJobInfo(job.getId());
- }
- }
-
- Path oozieLogPath = new Path(getLogPath(),
- "job-2010-01-01-01-00/000/oozie.log");
- Assert.assertTrue(fs.exists(oozieLogPath));
-
- testLogMoverWithNextRunId(job.getId());
- testLogMoverWithNextRunIdWithEngine(job.getId());
- }
-
- private Path getLogPath() throws FalconException {
- Path stagingPath = EntityUtil.getLogPath(testCluster.getCluster(), testProcess);
- return new Path(ClusterHelper.getStorageUrl(testCluster
- .getCluster()), stagingPath);
- }
-
- private void testLogMoverWithNextRunId(String jobId) throws Exception {
- LogMover.main(new String[]{"-workflowEngineUrl",
- ClusterHelper.getOozieUrl(testCluster.getCluster()),
- "-subflowId", jobId + "@user-workflow", "-runId", "1",
- "-logDir", getLogPath().toString() + "/job-2010-01-01-01-00",
- "-status", "SUCCEEDED", "-entityType", "process", });
-
- Path oozieLogPath = new Path(getLogPath(),
- "job-2010-01-01-01-00/001/oozie.log");
- Assert.assertTrue(fs.exists(oozieLogPath));
- }
-
- private void testLogMoverWithNextRunIdWithEngine(String jobId) throws Exception {
- LogMover.main(new String[]{"-workflowEngineUrl",
- ClusterHelper.getOozieUrl(testCluster.getCluster()),
- "-subflowId", jobId + "@user-workflow", "-runId", "1",
- "-logDir", getLogPath().toString() + "/job-2010-01-01-01-00",
- "-status", "SUCCEEDED", "-entityType", "process",
- "-userWorkflowEngine", "oozie", });
-
- Path oozieLogPath = new Path(getLogPath(),
- "job-2010-01-01-01-00/001/oozie.log");
- Assert.assertTrue(fs.exists(oozieLogPath));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java b/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java
deleted file mode 100644
index 4c3ce97..0000000
--- a/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.logging;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.cluster.util.StandAloneCluster;
-import org.apache.falcon.entity.parser.ProcessEntityParser;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.resource.InstancesResult.Instance;
-import org.apache.falcon.resource.InstancesResult.InstanceAction;
-import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
-import org.apache.falcon.resource.TestContext;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Test for LogProvider.
- */
-public class LogProviderIT {
-
- private static final ConfigurationStore STORE = ConfigurationStore.get();
- private static EmbeddedCluster testCluster = null;
- private static Process testProcess = null;
- private static final String PROCESS_NAME = "testProcess";
- private static FileSystem fs;
- private Instance instance;
-
- @BeforeClass
- public void setup() throws Exception {
- Map<String, String> overlay = new HashMap<String, String>();
- overlay.put("cluster", "logProviderTest");
- overlay.put("colo", "gs");
- TestContext context = new TestContext();
- String file = context.
- overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay);
- testCluster = StandAloneCluster.newCluster(file);
- cleanupStore();
- STORE.publish(EntityType.CLUSTER, testCluster.getCluster());
- fs = FileSystem.get(testCluster.getConf());
- Path instanceLogPath = new Path(
- "/projects/falcon/staging/falcon/workflows/process/" + PROCESS_NAME
- + "/logs/job-2010-01-01-01-00/000");
- fs.mkdirs(instanceLogPath);
- fs.createNewFile(new Path(instanceLogPath, "oozie.log"));
- fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
- fs.createNewFile(new Path(instanceLogPath, "mr_Action_FAILED.log"));
- fs.createNewFile(new Path(instanceLogPath, "mr_Action2_SUCCEEDED.log"));
-
- fs.mkdirs(new Path("/projects/falcon/staging/falcon/workflows/process/"
- + PROCESS_NAME + "/logs/job-2010-01-01-01-00/001"));
- fs.mkdirs(new Path("/projects/falcon/staging/falcon/workflows/process/"
- + PROCESS_NAME + "/logs/job-2010-01-01-01-00/002"));
- Path run3 = new Path("/projects/falcon/staging/falcon/workflows/process/"
- + PROCESS_NAME + "/logs/job-2010-01-01-01-00/003");
- fs.mkdirs(run3);
- fs.createNewFile(new Path(run3, "oozie.log"));
-
- testProcess = new ProcessEntityParser().parse(LogProviderIT.class
- .getResourceAsStream("/org/apache/falcon/logging/process.xml"));
- testProcess.setName(PROCESS_NAME);
- STORE.publish(EntityType.PROCESS, testProcess);
- }
-
- @BeforeMethod
- public void setInstance() {
- instance = new Instance();
- instance.status = WorkflowStatus.SUCCEEDED;
- instance.instance = "2010-01-01T01:00Z";
- instance.cluster = "logProviderTest";
- instance.logFile = "http://localhost:41000/oozie/wflog";
- }
-
- private void cleanupStore() throws FalconException {
- for (EntityType type : EntityType.values()) {
- Collection<String> entities = STORE.getEntities(type);
- for (String entity : entities) {
- STORE.remove(type, entity);
- }
- }
- }
-
- @Test
- public void testLogProviderWithValidRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- Instance instanceWithLog = provider.populateLogUrls(testProcess,
- instance, "0");
- Assert.assertEquals(
- instance.logFile,
- "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
- + "job-2010-01-01-01-00/000/oozie.log");
-
- InstanceAction action = instanceWithLog.actions[0];
- Assert.assertEquals(action.action, "mr_Action2");
- Assert.assertEquals(action.status, "SUCCEEDED");
- Assert.assertEquals(
- action.logFile,
- "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
- + "job-2010-01-01-01-00/000/mr_Action2_SUCCEEDED.log");
-
- action = instanceWithLog.actions[1];
- Assert.assertEquals(action.action, "mr_Action");
- Assert.assertEquals(action.status, "FAILED");
- Assert.assertEquals(
- action.logFile,
- "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
- + "job-2010-01-01-01-00/000/mr_Action_FAILED.log");
- }
-
- @Test
- public void testLogProviderWithInvalidRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- provider.populateLogUrls(testProcess, instance, "x");
- Assert.assertEquals(instance.logFile,
- "http://localhost:41000/oozie/wflog");
- }
-
- @Test
- public void testLogProviderWithUnavailableRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- instance.logFile = null;
- provider.populateLogUrls(testProcess, instance, "7");
- Assert.assertEquals(instance.logFile, "-");
- }
-
- @Test
- public void testLogProviderWithEmptyRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- instance.logFile = null;
- provider.populateLogUrls(testProcess, instance, null);
- Assert.assertEquals(
- instance.logFile,
- "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
- + "job-2010-01-01-01-00/003/oozie.log");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
index 58ae4ba..1f4e9e8 100644
--- a/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
+++ b/webapp/src/test/java/org/apache/falcon/process/PigProcessIT.java
@@ -58,8 +58,8 @@ public class PigProcessIT {
overlay = context.getUniqueOverlay();
- String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
- context.setCluster(filePath);
+ String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
+ context.setCluster(overlay.get("cluster"));
final Cluster cluster = context.getCluster().getCluster();
final String storageUrl = ClusterHelper.getStorageUrl(cluster);
@@ -88,7 +88,7 @@ public class PigProcessIT {
public void testSubmitAndSchedulePigProcess() throws Exception {
overlay.put("cluster", "primary-cluster");
- String filePath = context.overlayParametersOverTemplate(CLUSTER_TEMPLATE, overlay);
+ String filePath = context.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay);
Assert.assertEquals(0, TestContext.executeWithURL("entity -submit -type cluster -file " + filePath));
// context.setCluster(filePath);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index aa059bd..1ceaabf 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -17,10 +17,15 @@
*/
package org.apache.falcon.resource;
-import java.io.*;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
import java.util.regex.Pattern;
import javax.servlet.ServletInputStream;
@@ -28,11 +33,16 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
+import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.feed.*;
+import org.apache.falcon.entity.v0.feed.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.feed.Location;
+import org.apache.falcon.entity.v0.feed.LocationType;
+import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Property;
@@ -44,7 +54,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.BundleJob;
-import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.Job.Status;
import org.apache.oozie.client.OozieClient;
@@ -53,13 +62,17 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.sun.jersey.api.client.ClientResponse;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringReader;
/**
* Test class for Entity REST APIs.
*
* Tests should be enabled only in local environments as they need running instance of the web server.
*/
+@Test(groups = {"exhaustive"})
public class EntityManagerJerseyIT {
private static final int ONE_HR = 2 * 24 * 60 * 60 * 1000;
@@ -69,7 +82,7 @@ public class EntityManagerJerseyIT {
TestContext.prepare();
}
- private void assertLibs(FileSystem fs, Path path) throws IOException {
+ static void assertLibs(FileSystem fs, Path path) throws IOException {
FileStatus[] libs = fs.listStatus(path);
Assert.assertNotNull(libs);
Assert.assertEquals(libs.length, 1);
@@ -102,7 +115,7 @@ public class EntityManagerJerseyIT {
String tmpFileName = context.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(new File(tmpFileName));
Location location = new Location();
- location.setPath("fsext://localhost:41020/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}");
+ location.setPath("fsext://global:00/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}");
location.setType(LocationType.DATA);
Cluster cluster = feed.getClusters().getClusters().get(0);
cluster.setLocations(new Locations());
@@ -138,7 +151,8 @@ public class EntityManagerJerseyIT {
Map<String, String> overlay = context.getUniqueOverlay();
String tmpFileName = context.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
- updateEndtime(process);
+ Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+ processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
File tmpFile = context.getTempFile();
EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
@@ -148,11 +162,22 @@ public class EntityManagerJerseyIT {
Assert.assertEquals(bundles.size(), 1);
Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER);
- Feed feed = (Feed) getDefinition(context, EntityType.FEED, context.outputFeedName);
+ ClientResponse response = context.service.path("api/entities/definition/feed/"
+ + context.outputFeedName).header(
+ "Remote-User", TestContext.REMOTE_USER)
+ .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+ Feed feed = (Feed) EntityType.FEED.getUnmarshaller()
+ .unmarshal(new StringReader(response.getEntity(String.class)));
//change output feed path and update feed as another user
feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
- update(context, feed);
+ tmpFile = context.getTempFile();
+ EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
+ response = context.service.path("api/entities/update/feed/"
+ + context.outputFeedName).header("Remote-User",
+ TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
+ .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+ context.assertSuccessful(response);
bundles = context.getBundles();
Assert.assertEquals(bundles.size(), 2);
@@ -176,7 +201,6 @@ public class EntityManagerJerseyIT {
contexts.remove();
}
- @Test(enabled = false)
public void testOptionalInput() throws Exception {
TestContext context = newContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -200,7 +224,6 @@ public class EntityManagerJerseyIT {
context.waitForWorkflowStart(context.processName);
}
- @Test
public void testProcessDeleteAndSchedule() throws Exception {
//Submit process with invalid property so that coord submit fails and bundle goes to failed state
TestContext context = newContext();
@@ -214,7 +237,7 @@ public class EntityManagerJerseyIT {
File tmpFile = context.getTempFile();
EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
context.scheduleProcess(tmpFile.getAbsolutePath(), overlay);
- context.waitForBundleStart(Status.FAILED);
+ context.waitForBundleStart(Status.FAILED, Status.KILLED);
//Delete and re-submit the process with correct workflow
ClientResponse clientRepsonse = context.service.path("api/entities/delete/process/"
@@ -267,12 +290,18 @@ public class EntityManagerJerseyIT {
OozieClient ozClient = context.getOozieClient();
String coordId = ozClient.getBundleJobInfo(bundles.get(0).getId()).getCoordinators().get(0).getId();
- Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
+ ClientResponse response = context.service.path("api/entities/definition/process/"
+ + context.processName).header(
+ "Remote-User", TestContext.REMOTE_USER)
+ .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller()
+ .unmarshal(new StringReader(response.getEntity(String.class)));
+
String feed3 = "f3" + System.currentTimeMillis();
Map<String, String> overlay = new HashMap<String, String>();
overlay.put("inputFeedName", feed3);
overlay.put("cluster", context.clusterName);
- ClientResponse response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
+ response = context.submitToFalcon(TestContext.FEED_TEMPLATE1, overlay, EntityType.FEED);
context.assertSuccessful(response);
Input input = new Input();
@@ -282,34 +311,48 @@ public class EntityManagerJerseyIT {
input.setEnd("today(20,20)");
process.getInputs().getInputs().add(input);
- Date endTime = getEndTime();
- updateEndtime(process);
- update(context, process, endTime);
+ Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+ processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
+ File tmpFile = context.getTempFile();
+ EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+ response = context.service.path("api/entities/update/process/"
+ + context.processName).header("Remote-User",
+ TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
+ .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+ context.assertSuccessful(response);
//Assert that update creates new bundle and old coord is running
bundles = context.getBundles();
Assert.assertEquals(bundles.size(), 2);
- CoordinatorJob coord = ozClient.getCoordJobInfo(coordId);
- Assert.assertEquals(coord.getStatus(), Status.RUNNING);
- Assert.assertEquals(coord.getEndTime(), endTime);
+ Assert.assertEquals(ozClient.getCoordJobInfo(coordId).getStatus(), Status.RUNNING);
}
- @Test
public void testProcessEndtimeUpdate() throws Exception {
TestContext context = newContext();
context.scheduleProcess();
context.waitForBundleStart(Job.Status.RUNNING);
- Process process = (Process) getDefinition(context, EntityType.PROCESS, context.processName);
- updateEndtime(process);
- update(context, process);
+ ClientResponse response = context.service.path("api/entities/definition/process/"
+ + context.processName).header(
+ "Remote-User", TestContext.REMOTE_USER)
+ .accept(MediaType.TEXT_XML).get(ClientResponse.class);
+ Process process = (Process) EntityType.PROCESS.getUnmarshaller()
+ .unmarshal(new StringReader(response.getEntity(String.class)));
+
+ Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
+ processValidity.setEnd(new Date(new Date().getTime() + 60 * 60 * 1000));
+ File tmpFile = context.getTempFile();
+ EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
+ response = context.service.path("api/entities/update/process/" + context.processName).header("Remote-User",
+ TestContext.REMOTE_USER).accept(MediaType.TEXT_XML)
+ .post(ClientResponse.class, context.getServletInputStream(tmpFile.getAbsolutePath()));
+ context.assertSuccessful(response);
//Assert that update does not create new bundle
List<BundleJob> bundles = context.getBundles();
Assert.assertEquals(bundles.size(), 1);
}
- @Test
public void testStatus() throws Exception {
TestContext context = newContext();
ClientResponse response;
@@ -332,7 +375,6 @@ public class EntityManagerJerseyIT {
}
- @Test
public void testIdempotentSubmit() throws Exception {
TestContext context = newContext();
ClientResponse response;
@@ -345,7 +387,6 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(response);
}
- @Test
public void testNotFoundStatus() {
TestContext context = newContext();
ClientResponse response;
@@ -358,7 +399,6 @@ public class EntityManagerJerseyIT {
Assert.assertEquals(response.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
}
- @Test
public void testVersion() {
TestContext context = newContext();
ClientResponse response;
@@ -379,7 +419,6 @@ public class EntityManagerJerseyIT {
"No deploy.mode found in /api/admin/version");
}
- @Test
public void testValidate() {
TestContext context = newContext();
ServletInputStream stream = context.getServletInputStream(getClass().
@@ -394,7 +433,6 @@ public class EntityManagerJerseyIT {
context.assertFailure(clientRepsonse);
}
- @Test
public void testClusterValidate() throws Exception {
TestContext context = newContext();
ClientResponse clientRepsonse;
@@ -410,7 +448,6 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(clientRepsonse);
}
- @Test
public void testClusterSubmitScheduleSuspendResumeDelete() throws Exception {
TestContext context = newContext();
ClientResponse clientRepsonse;
@@ -448,7 +485,6 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(clientRepsonse);
}
- @Test
public void testSubmit() throws Exception {
TestContext context = newContext();
ClientResponse response;
@@ -467,7 +503,6 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(response);
}
- @Test
public void testGetEntityDefinition() throws Exception {
TestContext context = newContext();
ClientResponse response;
@@ -494,7 +529,6 @@ public class EntityManagerJerseyIT {
}
}
- @Test
public void testInvalidGetEntityDefinition() {
TestContext context = newContext();
ClientResponse clientRepsonse = context.service
@@ -504,7 +538,6 @@ public class EntityManagerJerseyIT {
context.assertFailure(clientRepsonse);
}
- @Test
public void testScheduleSuspendResume() throws Exception {
TestContext context = newContext();
context.scheduleProcess();
@@ -522,7 +555,6 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(clientRepsonse);
}
- @Test(enabled = true)
public void testFeedSchedule() throws Exception {
TestContext context = newContext();
ClientResponse response;
@@ -543,7 +575,7 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(clientRepsonse);
}
- private List<Path> createTestData(TestContext context) throws Exception {
+ static List<Path> createTestData(TestContext context) throws Exception {
List<Path> list = new ArrayList<Path>();
FileSystem fs = context.cluster.getFileSystem();
fs.mkdirs(new Path("/user/guest"));
@@ -593,7 +625,6 @@ public class EntityManagerJerseyIT {
return list;
}
- @Test
public void testDeleteDataSet() throws Exception {
TestContext context = newContext();
ClientResponse response;
@@ -612,7 +643,6 @@ public class EntityManagerJerseyIT {
context.assertSuccessful(response);
}
- @Test
public void testDelete() throws Exception {
TestContext context = newContext();
ClientResponse response;
[3/3] git commit: FALCON-123 Improve build speeds in falcon.
Contributed by Srikanth Sundarrajan
Posted by sh...@apache.org.
FALCON-123 Improve build speeds in falcon. Contributed by Srikanth Sundarrajan
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d1642bea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d1642bea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d1642bea
Branch: refs/heads/master
Commit: d1642beab5d5073dd57c43137f0b7b22cb87ecf9
Parents: d555dd5
Author: Shwetha GS <sh...@gmail.com>
Authored: Wed Jan 22 15:53:21 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Wed Jan 22 15:53:21 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
acquisition/pom.xml | 7 +
archival/pom.xml | 7 +
build-tools/pom.xml | 8 +
.../java/org/apache/falcon/cli/FalconCLI.java | 11 +-
.../falcon/cleanup/LogCleanupServiceTest.java | 35 +-
.../falcon/entity/FileSystemStorageTest.java | 26 +-
common/src/test/resources/runtime.properties | 25 ++
.../falcon/converter/OozieFeedMapperTest.java | 3 +-
hadoop-dependencies/pom.xml | 52 +++
.../apache/falcon/hadoop/JailedFileSystem.java | 195 +++++++++++
...op.mapreduce.protocol.ClientProtocolProvider | 14 +
.../mapred/ClassicClientProtocolProvider.java | 53 +++
hadoop-webapp/pom.xml | 51 ++-
.../org/apache/falcon/JobTrackerService.java | 28 ++
.../falcon/listener/HadoopStartupListener.java | 95 +-----
hadoop-webapp/src/main/resources/core-site.xml | 8 +-
hadoop-webapp/src/main/resources/hive-site.xml | 2 +-
hadoop-webapp/src/main/resources/yarn-site.xml | 2 +-
.../apache/hadoop/mapred/LocalRunnerTest.java | 46 +++
.../org/apache/hadoop/mapred/LocalRunnerV1.java | 208 ++++++++++++
.../org/apache/hadoop/mapred/LocalRunnerV2.java | 242 ++++++++++++++
messaging/pom.xml | 6 +
.../messaging/FalconTopicProducerTest.java | 3 +-
.../falcon/messaging/FeedProducerTest.java | 2 +-
.../falcon/messaging/ProcessProducerTest.java | 2 +-
metrics/pom.xml | 6 +
oozie-el-extensions/pom.xml | 6 +
pom.xml | 321 +++++++++++--------
.../converter/OozieProcessMapperTest.java | 4 +-
test-util/pom.xml | 5 +
.../falcon/cluster/util/EmbeddedCluster.java | 61 ++--
test-util/src/main/resources/core-site.xml | 31 ++
webapp/pom.xml | 52 ++-
webapp/src/conf/oozie/conf/action-conf/hive.xml | 2 +-
.../conf/oozie/conf/hadoop-conf/core-site.xml | 2 +-
webapp/src/conf/oozie/conf/oozie-site.xml | 9 +
.../falcon/catalog/HiveCatalogServiceIT.java | 2 +-
.../java/org/apache/falcon/cli/FalconCLIIT.java | 42 +--
.../org/apache/falcon/cli/FalconCLISmokeIT.java | 100 ++++++
.../lifecycle/TableStorageFeedEvictorIT.java | 2 +-
.../org/apache/falcon/logging/LogMoverIT.java | 171 ----------
.../apache/falcon/logging/LogProviderIT.java | 161 ----------
.../org/apache/falcon/process/PigProcessIT.java | 6 +-
.../falcon/resource/EntityManagerJerseyIT.java | 110 ++++---
.../resource/EntityManagerJerseySmokeIT.java | 119 +++++++
.../resource/ProcessInstanceManagerIT.java | 2 +-
.../org/apache/falcon/resource/TestContext.java | 37 ++-
webapp/src/test/resources/cluster-template.xml | 4 +-
49 files changed, 1649 insertions(+), 738 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index efe0d88..0ea0245 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,7 @@ Trunk (Unreleased)
OPTIMIZATIONS
+ FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
BUG FIXES
FALCON-260 When a process is scheduled, the user workflow is failing with OozieClientException.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/acquisition/pom.xml
----------------------------------------------------------------------
diff --git a/acquisition/pom.xml b/acquisition/pom.xml
index e2017dd..040f9c5 100644
--- a/acquisition/pom.xml
+++ b/acquisition/pom.xml
@@ -31,4 +31,11 @@
<name>Apache Falcon Acquisition</name>
<packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/archival/pom.xml
----------------------------------------------------------------------
diff --git a/archival/pom.xml b/archival/pom.xml
index c43e645..c9dda31 100644
--- a/archival/pom.xml
+++ b/archival/pom.xml
@@ -31,4 +31,11 @@
<name>Apache Falcon Archival</name>
<packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/build-tools/pom.xml b/build-tools/pom.xml
index 7a020d3..00b913f 100644
--- a/build-tools/pom.xml
+++ b/build-tools/pom.xml
@@ -30,6 +30,14 @@
<artifactId>build-tools</artifactId>
<name>Build Tools</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 01d2ced..37ccf4f 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -131,17 +131,18 @@ public class FalconCLI {
try {
CLIParser.Command command = parser.parse(args);
+ int exitValue = 0;
if (command.getName().equals(HELP_CMD)) {
parser.showHelp();
} else if (command.getName().equals(ADMIN_CMD)) {
- adminCommand(command.getCommandLine());
+ exitValue = adminCommand(command.getCommandLine());
} else if (command.getName().equals(ENTITY_CMD)) {
entityCommand(command.getCommandLine());
} else if (command.getName().equals(INSTANCE_CMD)) {
instanceCommand(command.getCommandLine());
}
- return 0;
+ return exitValue;
} catch (FalconCLIException ex) {
ERR.get().println("Error: " + ex.getMessage());
return -1;
@@ -562,7 +563,7 @@ public class FalconCLI {
return url;
}
- private void adminCommand(CommandLine commandLine) throws FalconCLIException, IOException {
+ private int adminCommand(CommandLine commandLine) throws FalconCLIException, IOException {
String result;
String falconUrl = getFalconEndpoint(commandLine);
FalconClient client = new FalconClient(falconUrl);
@@ -576,6 +577,7 @@ public class FalconCLI {
result = client.getThreadDump();
OUT.get().println(result);
}
+ int exitValue = 0;
if (optionsList.contains(STATUS_OPTION)) {
int status = 0;
try {
@@ -583,11 +585,13 @@ public class FalconCLI {
if (status != 200) {
ERR.get().println("Falcon server is not fully operational (on " + falconUrl + "). "
+ "Please check log files.");
+ exitValue = status;
} else {
OUT.get().println("Falcon server is running (on " + falconUrl + ")");
}
} catch (Exception e) {
ERR.get().println("Falcon server doesn't seem to be running on " + falconUrl);
+ exitValue = -1;
}
} else if (optionsList.contains(VERSION_OPTION)) {
result = client.getVersion();
@@ -595,6 +599,7 @@ public class FalconCLI {
} else if (optionsList.contains(HELP_CMD)) {
OUT.get().println("Falcon Help");
}
+ return exitValue;
}
private Properties getClientProperties() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index 10a9cc0..432d06b 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -80,6 +81,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
conf = dfsCluster.getConf();
fs = dfsCluster.getFileSystem();
+ fs.delete(new Path("/"), true);
storeEntity(EntityType.CLUSTER, "testCluster");
System.setProperty("test.build.data", "target/tdfs/data" + System.currentTimeMillis());
@@ -111,6 +113,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
tfs = targetDfsCluster.getFileSystem();
+ tfs.delete(new Path("/"), true);
fs.mkdirs(feedInstanceLogPath);
fs.mkdirs(feedInstanceLogPath1);
tfs.mkdirs(feedInstanceLogPath);
@@ -120,11 +123,9 @@ public class LogCleanupServiceTest extends AbstractTestBase {
// table feed staging dir setup
initializeStagingDirs();
- createStageData(sourceStagingPath1, targetStagingPath1);
-
- Thread.sleep(61000);
-
- createStageData(sourceStagingPath2, targetStagingPath2);
+ createStageData(sourceStagingPath1, targetStagingPath1, 0);
+ createStageData(sourceStagingPath2, targetStagingPath2, 10000);
+ Thread.sleep(1000);
}
private void initializeStagingDirs() throws Exception {
@@ -147,14 +148,26 @@ public class LogCleanupServiceTest extends AbstractTestBase {
targetStagingPath2 = new Path(targetStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
}
- private void createStageData(Path sourcePath, Path targetPath) throws Exception {
+ private void createStageData(Path sourcePath, Path targetPath, int offset) throws Exception {
fs.mkdirs(sourcePath);
- fs.createNewFile(new Path(sourcePath, "_metadata.xml"));
- fs.createNewFile(new Path(sourcePath, "data.txt"));
+ Path metaSource = new Path(sourcePath, "_metadata.xml");
+ Path dataSource = new Path(sourcePath, "data.txt");
+ fs.createNewFile(metaSource);
+ fs.createNewFile(dataSource);
+ FileStatus status = fs.getFileStatus(metaSource);
+ fs.setTimes(metaSource, status.getModificationTime() + offset, status.getAccessTime());
+ status = fs.getFileStatus(dataSource);
+ fs.setTimes(dataSource, status.getModificationTime() + offset, status.getAccessTime());
tfs.mkdirs(targetPath);
- tfs.createNewFile(new Path(targetPath, "_metadata.xml"));
- tfs.createNewFile(new Path(targetPath, "data.txt"));
+ Path metaTarget = new Path(targetPath, "_metadata.xml");
+ Path dataTarget = new Path(targetPath, "data.txt");
+ tfs.createNewFile(metaTarget);
+ tfs.createNewFile(dataTarget);
+ status = tfs.getFileStatus(metaTarget);
+ tfs.setTimes(metaTarget, status.getModificationTime() + offset, status.getAccessTime());
+ status = tfs.getFileStatus(dataTarget);
+ tfs.setTimes(dataTarget, status.getModificationTime() + offset, status.getAccessTime());
}
@Test
@@ -169,7 +182,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
Assert.assertTrue(fs.exists(instanceLogPath3));
}
- @Test
+ @Test (enabled = false)
public void testFeedLogs() throws IOException, FalconException, InterruptedException {
AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 6917472..7b48d2b 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -75,8 +75,8 @@ public class FileSystemStorageTest {
List<Location> locations = new ArrayList<Location>();
locations.add(location);
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
- Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+ FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
+ Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "jail://global:00/foo/bar");
}
@Test
@@ -106,21 +106,21 @@ public class FileSystemStorageTest {
StringBuilder expected = new StringBuilder();
expected.append(LocationType.DATA)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/data/foo/bar")
+ .append("jail://global:00/data/foo/bar")
.append(FileSystemStorage.FEED_PATH_SEP)
.append(LocationType.META)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/meta/foo/bar")
+ .append("jail://global:00/meta/foo/bar")
.append(FileSystemStorage.FEED_PATH_SEP)
.append(LocationType.STATS)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/stats/foo/bar")
+ .append("jail://global:00/stats/foo/bar")
.append(FileSystemStorage.FEED_PATH_SEP)
.append(LocationType.TMP)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/tmp/foo/bar");
+ .append("jail://global:00/tmp/foo/bar");
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
Assert.assertEquals(storage.getUriTemplate(), expected.toString());
}
@@ -139,9 +139,9 @@ public class FileSystemStorageTest {
@DataProvider(name = "locationTestDataProvider")
private Object[][] createLocationTestData() {
return new Object[][] {
- {"hdfs://localhost:41020", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
- {"hdfs://localhost:41020", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
- {"hdfs://localhost:41020", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+ {"jail://global:00", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+ {"jail://global:00", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+ {"jail://global:00", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
{"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
{"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
{"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
@@ -169,13 +169,13 @@ public class FileSystemStorageTest {
List<Location> locations = new ArrayList<Location>();
locations.add(location);
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
Assert.assertTrue(storage.exists());
}
@Test
public void testIsIdentical() throws Exception {
- final String storageUrl = "hdfs://localhost:41020";
+ final String storageUrl = "jail://global:00";
final Location location1 = new Location();
location1.setPath("/foo/bar");
location1.setType(LocationType.DATA);
@@ -195,7 +195,7 @@ public class FileSystemStorageTest {
@Test
public void testIsIdenticalNegative() throws Exception {
- final String storageUrl = "hdfs://localhost:41020";
+ final String storageUrl = "jail://global:00";
final Location location1 = new Location();
location1.setPath("/foo/baz");
location1.setType(LocationType.DATA);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/common/src/test/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/test/resources/runtime.properties b/common/src/test/resources/runtime.properties
new file mode 100644
index 0000000..f76ff51
--- /dev/null
+++ b/common/src/test/resources/runtime.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=unittest
+
+unittest.log.cleanup.frequency.minutes.retention=500
+unittest.log.cleanup.frequency.hours.retention=500
+unittest.log.cleanup.frequency.days.retention=5000
+unittest.log.cleanup.frequency.months.retention=500
+
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 128784e..a37755b 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -40,7 +40,6 @@ import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.DECISION;
import org.apache.falcon.oozie.workflow.JAVA;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -110,7 +109,7 @@ public class OozieFeedMapperTest {
if (type == EntityType.CLUSTER) {
Cluster cluster = (Cluster) entity;
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
- FileSystem fs = new Path(writeEndpoint).getFileSystem(new Configuration());
+ FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
fs.create(
new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/pom.xml b/hadoop-dependencies/pom.xml
index 225b9d9..9653af2 100644
--- a/hadoop-dependencies/pom.xml
+++ b/hadoop-dependencies/pom.xml
@@ -52,9 +52,61 @@
<artifactId>hadoop-client</artifactId>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src/versioned-src/v2/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.basedir}/target/classes/META-INF/services</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${project.basedir}/src/main/services</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</profile>
</profiles>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
new file mode 100644
index 0000000..e12a509
--- /dev/null
+++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hadoop;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * chroot local file system for tests.
+ */
+public class JailedFileSystem extends FileSystem {
+ private URI uri;
+ private String basePath;
+ private LocalFileSystem localFS;
+
+ public JailedFileSystem() {
+ localFS = new LocalFileSystem();
+ }
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+ setConf(conf);
+ localFS.initialize(LocalFileSystem.getDefaultUri(conf), conf);
+ String base = name.getHost();
+ if (base == null) {
+ throw new IOException("Incomplete Jail URI, no jail base: "+ name);
+ }
+ basePath = new Path(conf.get("jail.base", System.getProperty("hadoop.tmp.dir",
+ conf.get("hadoop.tmp.dir", "/tmp"))) + "/jail-fs/" + base).toUri().getPath();
+ this.uri = URI.create(name.getScheme()+"://"+name.getAuthority());
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ private Path toLocalPath(Path f) {
+ return new Path(basePath + f.toUri().getPath());
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return localFS.open(toLocalPath(f), bufferSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return localFS.create(toLocalPath(f), permission, overwrite, bufferSize,
+ replication, blockSize, progress);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return localFS.append(toLocalPath(f), bufferSize, progress);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return localFS.rename(toLocalPath(src), toLocalPath(dst));
+ }
+
+ @Override
+ public boolean delete(Path f) throws IOException {
+ return delete(toLocalPath(f), false);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return localFS.delete(toLocalPath(f), recursive);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ FileStatus[] fileStatuses = localFS.listStatus(toLocalPath(f));
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ return fileStatuses;
+ } else {
+ FileStatus[] jailFileStatuses = new FileStatus[fileStatuses.length];
+ for (int index = 0; index < fileStatuses.length; index++) {
+ FileStatus status = fileStatuses[index];
+ jailFileStatuses[index] = new FileStatus(status.getLen(), status.isDir(),
+ status.getReplication(), status.getBlockSize(), status.getModificationTime(),
+ status.getAccessTime(), status.getPermission(), status.getOwner(), status.getGroup(),
+ fromLocalPath(status.getPath()).makeQualified(this));
+ }
+ return jailFileStatuses;
+ }
+ }
+
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path("/user/" + System.getProperty("user.name"));
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return localFS.mkdirs(toLocalPath(f), permission);
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ final byte[] md5 = DigestUtils.md5(FileUtils.readFileToByteArray(new File(toLocalPath(f).toString())));
+ return new FileChecksum() {
+
+ @Override
+ public String getAlgorithmName() {
+ return "MD5";
+ }
+
+ @Override
+ public int getLength() {
+ return md5.length;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return md5;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+ };
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ FileStatus status = localFS.getFileStatus(toLocalPath(f));
+ if (status == null) {
+ return null;
+ }
+ return new FileStatus(status.getLen(), status.isDir(),
+ status.getReplication(), status.getBlockSize(), status.getModificationTime(),
+ status.getAccessTime(), status.getPermission(), status.getOwner(), status.getGroup(),
+ fromLocalPath(status.getPath()).makeQualified(this.getUri(), this.getWorkingDirectory()));
+ }
+
+ private Path fromLocalPath(Path path) {
+ String pathString = path.toUri().getPath().replaceFirst(basePath, "");
+ return new Path(pathString.isEmpty() ? "/" : pathString);
+ }
+
+ @Override
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
+ super.setTimes(p, mtime, atime);
+ }
+
+ @Override
+ public void close() throws IOException {
+ localFS.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..d8a60f6
--- /dev/null
+++ b/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.hadoop.mapred.ClassicClientProtocolProvider
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
new file mode 100644
index 0000000..079eca9
--- /dev/null
+++ b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Classic protocol provider for Hadoop v2 based tests.
+ */
+public class ClassicClientProtocolProvider extends ClientProtocolProvider {
+
+ private static final String LOCALHOST = "localhost";
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ String framework = conf.get(MRConfig.FRAMEWORK_NAME, "unittests");
+ String tracker = conf.get("mapred.job.tracker", conf.get("yarn.resourcemanager.address", LOCALHOST));
+ if (!"unittests".equals(framework) || !tracker.startsWith(LOCALHOST)) {
+ return null;
+ }
+ return new LocalJobRunner(conf);
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
+ return create(conf);
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index 7640883..e576310 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -18,7 +18,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -44,6 +44,29 @@
<scope>compile</scope>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src/versioned-src/v1/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop-2</id>
@@ -71,6 +94,11 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
</dependency>
</dependencies>
@@ -95,6 +123,22 @@
</dependency>
<dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-test-util</artifactId>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
@@ -125,6 +169,11 @@
<groupId>org.apache.hcatalog</groupId>
<artifactId>webhcat-java-client</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java b/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java
new file mode 100644
index 0000000..fa3a84a
--- /dev/null
+++ b/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon;
+
+/**
+ * Job Tracker service interface.
+ */
+public interface JobTrackerService {
+
+ void start() throws Exception;
+
+ void stop() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java b/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
index c2bb6f7..1468ac4 100644
--- a/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
+++ b/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
@@ -18,22 +18,14 @@
package org.apache.falcon.listener;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-
import org.apache.activemq.broker.BrokerService;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.falcon.JobTrackerService;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+
/**
* Listener for bootstrapping embedded hadoop cluster for integration tests.
*/
@@ -44,80 +36,27 @@ public class HadoopStartupListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
try {
- FileUtils.deleteDirectory(new File(System.getProperty("hadoop.tmp.dir")));
- final Configuration conf = new Configuration();
-
- NameNode.format(conf);
- final String[] emptyArgs = {};
- try {
- Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl");
- startHadoop2Services(conf, emptyArgs);
- } catch (ClassNotFoundException cnfe) {
- startHadoop1Services(conf, emptyArgs);
- }
+ startLocalJobRunner();
startBroker();
-
startHiveMetaStore();
} catch (Exception e) {
- e.printStackTrace();
- LOG.error("Unable to start hadoop cluster", e);
- throw new RuntimeException("Unable to start hadoop cluster", e);
+ LOG.error("Unable to start daemons", e);
+ throw new RuntimeException("Unable to start daemons", e);
}
}
- private void startHadoop1Services(Configuration conf, String[] emptyArgs)
- throws IOException, IllegalAccessException, InvocationTargetException,
- NoSuchMethodException, ClassNotFoundException, InstantiationException {
-
- NameNode.createNameNode(emptyArgs, conf);
- DataNode.createDataNode(emptyArgs, conf);
-
- JobConf jobConf = new JobConf(conf);
- // JobTracker jt = JobTracker.startTracker(jobConf);
- // jt.offerService();
- // TaskTracker tt = new TaskTracker(jobConf);
- // tt.run();
-
- Object jt = Class.forName("org.apache.hadoop.mapred.JobTracker")
- .getMethod("startTracker", JobConf.class).invoke(null, jobConf);
- startService(jt, "offerService");
-
- Object tt = Class.forName("org.apache.hadoop.mapred.TaskTracker")
- .getConstructor(JobConf.class).newInstance(jobConf);
- startService(tt, "run");
- }
-
- private void startHadoop2Services(Configuration conf, String[] emptyArgs) throws Exception {
-
- // DefaultMetricsSystem.setMiniClusterMode(true);
- // ResourceManager resourceManager = new ResourceManager(new MemStore());
- // YarnConfiguration yarnConf = new YarnConfiguration(conf);
- // resourceManager.init(yarnConf);
- // resourceManager.start();
- // NodeManager nodeManager = new NodeManager();
- // nodeManager.init(yarnConf);
- // nodeManager.start();
-
- Class.forName("org.apache.hadoop.metrics2.lib.DefaultMetricsSystem")
- .getMethod("setMiniClusterMode", boolean.class).invoke(null, true);
-
- NameNode.createNameNode(emptyArgs, conf);
- DataNode.createDataNode(emptyArgs, conf);
-
- Object memStore = instance("org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore");
- Object resourceManager = Class.forName("org.apache.hadoop.yarn.server.resourcemanager.ResourceManager")
- .getConstructor(Class.forName("org.apache.hadoop.yarn.server.resourcemanager.recovery.Store"))
- .newInstance(memStore);
- Object yarnConf = Class.forName("org.apache.hadoop.yarn.conf.YarnConfiguration")
- .getConstructor(Configuration.class).newInstance(conf);
- invoke(resourceManager, "init", Configuration.class, yarnConf);
- startService(resourceManager, "start");
-
- Object nodeManager = instance("org.apache.hadoop.yarn.server.nodemanager.NodeManager");
- invoke(nodeManager, "init", Configuration.class, yarnConf);
- startService(nodeManager, "start");
+ @SuppressWarnings("unchecked")
+ private void startLocalJobRunner() throws Exception {
+ String className = "org.apache.hadoop.mapred.LocalRunnerV1";
+ try {
+ Class<? extends JobTrackerService> runner = (Class<? extends JobTrackerService>) Class.forName(className);
+ JobTrackerService service = runner.newInstance();
+ service.start();
+ } catch (ClassNotFoundException e) {
+ LOG.warn("v1 Hadoop components not found. Assuming v2", e);
+ }
}
private void startBroker() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/resources/core-site.xml b/hadoop-webapp/src/main/resources/core-site.xml
index 484e904..615b4d9 100644
--- a/hadoop-webapp/src/main/resources/core-site.xml
+++ b/hadoop-webapp/src/main/resources/core-site.xml
@@ -25,8 +25,13 @@
</property>
<property>
+ <name>fs.jail.impl</name>
+ <value>org.apache.falcon.hadoop.JailedFileSystem</value>
+ </property>
+
+ <property>
<name>fs.default.name</name>
- <value>hdfs://localhost:41020</value>
+ <value>jail://global:00</value>
</property>
<property>
@@ -46,4 +51,5 @@
<value>${project.build.directory}/tmp-hadoop-${user.name}</value>
<description>A base for other temporary directories.</description>
</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/resources/hive-site.xml b/hadoop-webapp/src/main/resources/hive-site.xml
index 49cda78..f03a3a6 100644
--- a/hadoop-webapp/src/main/resources/hive-site.xml
+++ b/hadoop-webapp/src/main/resources/hive-site.xml
@@ -31,7 +31,7 @@
<property>
<name>fs.default.name</name>
- <value>hdfs://localhost:41020</value>
+ <value>jail://global:00</value>
</property>
<!-- Forcing the creation of the db dir under target -->
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/resources/yarn-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/resources/yarn-site.xml b/hadoop-webapp/src/main/resources/yarn-site.xml
index 587f1c5..658752b 100644
--- a/hadoop-webapp/src/main/resources/yarn-site.xml
+++ b/hadoop-webapp/src/main/resources/yarn-site.xml
@@ -29,7 +29,7 @@
<property>
<name>mapreduce.framework.name</name>
- <value>yarn</value>
+ <value>unittests</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java b/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java
new file mode 100644
index 0000000..c96bb0e
--- /dev/null
+++ b/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.falcon.JobTrackerService;
+import org.apache.hadoop.conf.Configuration;
+import org.testng.annotations.Test;
+
+/**
+ * Test for LocalRunner.
+ */
+@Test (enabled = false)
+public class LocalRunnerTest {
+
+ @SuppressWarnings("unchecked")
+ public void testLocalRunner() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapred.job.tracker", "localhost:41021");
+ conf.set("mapreduce.framework.name", "unittests");
+ String hadoopProfle = System.getProperty("hadoop.profile", "1");
+ if (hadoopProfle.equals("1")) {
+ String className = "org.apache.hadoop.mapred.LocalRunnerV1";
+ Class<? extends JobTrackerService> runner =
+ (Class<? extends JobTrackerService>) Class.forName(className);
+ JobTrackerService service = runner.newInstance();
+ service.start();
+ }
+ JobClient client = new JobClient(new JobConf(conf));
+ System.out.println(client.getSystemDir());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java b/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java
new file mode 100644
index 0000000..5819cce
--- /dev/null
+++ b/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.falcon.JobTrackerService;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+
+/**
+ * Hosted Local Job runner.
+ * Please note that one of org.apache.hadoop.mapred.LocalRunnerV2 or
+ * org.apache.hadoop.mapred.LocalRunnerV2 is active in the project depending
+ * on the profile chosen.
+ */
+public class LocalRunnerV1 implements JobSubmissionProtocol, JobTrackerService {
+
+ private final JobSubmissionProtocol localProxy;
+ private final JobConf conf;
+ private RPC.Server server;
+
+ public LocalRunnerV1() {
+ try {
+ conf = new JobConf();
+ localProxy = new LocalJobRunner(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to initialize localRunner");
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ String[] tracker = conf.get("mapred.job.tracker", "localhost:41021").split(":");
+ server = RPC.getServer(this, tracker[0], Integer.parseInt(tracker[1]), conf);
+ server.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ server.stop();
+ }
+
+ @Override
+ public JobID getNewJobId() throws IOException {
+ return localProxy.getNewJobId();
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException {
+ return localProxy.submitJob(jobName, jobSubmitDir, ts);
+ }
+
+ @Override
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+ return localProxy.getClusterStatus(detailed);
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ return localProxy.getQueueAdmins(queueName);
+ }
+
+ @Override
+ public void killJob(JobID jobid) throws IOException {
+ localProxy.killJob(jobid);
+ }
+
+ @Override
+ public void setJobPriority(JobID jobid, String priority) throws IOException {
+ localProxy.setJobPriority(jobid, priority);
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
+ return localProxy.killTask(taskId, shouldFail);
+ }
+
+ @Override
+ public JobProfile getJobProfile(JobID jobid) throws IOException {
+ return localProxy.getJobProfile(jobid);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobid) throws IOException {
+ return localProxy.getJobStatus(jobid);
+ }
+
+ @Override
+ public Counters getJobCounters(JobID jobid) throws IOException {
+ return localProxy.getJobCounters(jobid);
+ }
+
+ @Override
+ public TaskReport[] getMapTaskReports(JobID jobid) throws IOException {
+ return localProxy.getMapTaskReports(jobid);
+ }
+
+ @Override
+ public TaskReport[] getReduceTaskReports(JobID jobid) throws IOException {
+ return localProxy.getReduceTaskReports(jobid);
+ }
+
+ @Override
+ public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException {
+ return localProxy.getCleanupTaskReports(jobid);
+ }
+
+ @Override
+ public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException {
+ return localProxy.getSetupTaskReports(jobid);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException {
+ return localProxy.getFilesystemName();
+ }
+
+ @Override
+ public JobStatus[] jobsToComplete() throws IOException {
+ return localProxy.jobsToComplete();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException {
+ return localProxy.getAllJobs();
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
+ throws IOException {
+ return localProxy.getTaskCompletionEvents(jobid, fromEventId, maxEvents);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException {
+ return localProxy.getTaskDiagnostics(taskId);
+ }
+
+ @Override
+ public String getSystemDir() {
+ return localProxy.getSystemDir();
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException {
+ return localProxy.getStagingAreaDir();
+ }
+
+ @Override
+ public JobQueueInfo[] getQueues() throws IOException {
+ return localProxy.getQueues();
+ }
+
+ @Override
+ public JobQueueInfo getQueueInfo(String queue) throws IOException {
+ return localProxy.getQueueInfo(queue);
+ }
+
+ @Override
+ public JobStatus[] getJobsFromQueue(String queue) throws IOException {
+ return localProxy.getJobsFromQueue(queue);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
+ return localProxy.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, InterruptedException {
+ return new Token<DelegationTokenIdentifier>(null, null, null, null);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ return localProxy.renewDelegationToken(token);
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ localProxy.cancelDelegationToken(token);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return localProxy.getProtocolVersion(protocol, clientVersion);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java b/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java
new file mode 100644
index 0000000..ccb1bd5
--- /dev/null
+++ b/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.falcon.JobTrackerService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+
+/**
+ * Local Job Runner for Hadoop v2.
+ * Please note that one of org.apache.hadoop.mapred.LocalRunnerV2 or
+ * org.apache.hadoop.mapred.LocalRunnerV2 is active in the project depending
+ * on the profile chosen.
+ */
+public class LocalRunnerV2 implements ClientProtocol, JobTrackerService {
+
+ private final ClientProtocol localProxy;
+ private final Configuration conf;
+ private Server server;
+
+ public LocalRunnerV2() {
+ try {
+ conf = new Configuration();
+ localProxy = new LocalJobRunner(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to initialize localRunner");
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ server = new RPC.Builder(conf).setBindAddress("0.0.0.0").setPort(41021).setInstance(this).
+ setProtocol(ClientProtocol.class).build();
+ server.start();
+ }
+
+ public void stop() throws Exception {
+ server.stop();
+ }
+
+ @Override
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ return localProxy.getNewJobID();
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException {
+ return localProxy.submitJob(jobId, jobSubmitDir, ts);
+ }
+
+ @Override
+ public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
+ return localProxy.getClusterMetrics();
+ }
+
+ @Override
+ public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
+ return localProxy.getJobTrackerStatus();
+ }
+
+ @Override
+ public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
+ return localProxy.getTaskTrackerExpiryInterval();
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ return localProxy.getQueueAdmins(queueName);
+ }
+
+ @Override
+ public void killJob(JobID jobid) throws IOException, InterruptedException {
+ localProxy.killJob(jobid);
+ }
+
+ @Override
+ public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
+ localProxy.setJobPriority(jobid, priority);
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, InterruptedException {
+ return localProxy.killTask(taskId, shouldFail);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobid) throws IOException, InterruptedException {
+ return localProxy.getJobStatus(jobid);
+ }
+
+ @Override
+ public Counters getJobCounters(JobID jobid) throws IOException, InterruptedException {
+ return localProxy.getJobCounters(jobid);
+ }
+
+ @Override
+ public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
+ return localProxy.getTaskReports(jobid, type);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return localProxy.getFilesystemName();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return localProxy.getAllJobs();
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
+ throws IOException, InterruptedException {
+ return localProxy.getTaskCompletionEvents(jobid, fromEventId, maxEvents);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
+ return localProxy.getTaskDiagnostics(taskId);
+ }
+
+ @Override
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
+ return localProxy.getActiveTrackers();
+ }
+
+ @Override
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
+ return localProxy.getBlacklistedTrackers();
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ return localProxy.getSystemDir();
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ return localProxy.getStagingAreaDir();
+ }
+
+ @Override
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ return localProxy.getJobHistoryDir();
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return localProxy.getQueues();
+ }
+
+ @Override
+ public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
+ return localProxy.getQueue(queueName);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
+ return localProxy.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return localProxy.getRootQueues();
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
+ return localProxy.getChildQueues(queueName);
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, InterruptedException {
+ return localProxy.getDelegationToken(renewer);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ return localProxy.renewDelegationToken(token);
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ localProxy.cancelDelegationToken(token);
+ }
+
+ @Override
+ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+ throws IOException, InterruptedException {
+ return localProxy.getLogFileParams(jobID, taskAttemptID);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return localProxy.getProtocolVersion(protocol, clientVersion);
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
+ throws IOException {
+ return localProxy.getProtocolSignature(protocol, clientVersion, clientMethodsHash);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index a59c1e3..9aa5347 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -87,6 +87,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-hadoop-dependencies</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index 27bea68..da126c7 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -138,7 +138,7 @@ public class FalconTopicProducerTest {
}
private List<String> createCommonArgs() {
- List<String> args = new ArrayList<String>(Arrays.asList(
+ return new ArrayList<String>(Arrays.asList(
"-" + ARG.workflowId.getArgName(), "workflow-01-00",
"-" + ARG.runId.getArgName(), "1",
"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
@@ -151,7 +151,6 @@ public class FalconTopicProducerTest {
"-" + ARG.status.getArgName(), ("SUCCEEDED"),
"-" + ARG.brokerTTL.getArgName(), "10",
"-" + ARG.cluster.getArgName(), "corp"));
- return args;
}
private void testProcessMessageCreator(final List<String[]> messages,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index a1609af..e707567 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -138,7 +138,7 @@ public class FeedProducerTest {
}
};
t.start();
- Thread.sleep(1500);
+ Thread.sleep(100);
new MessageProducer().run(this.args);
t.join();
if (error != null) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 078b9c2..3a40e76 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -92,7 +92,7 @@ public class ProcessProducerTest {
}
};
t.start();
- Thread.sleep(1500);
+ Thread.sleep(100);
new MessageProducer().run(this.args);
t.join();
if (error != null) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index 1ed4f6b..f1fba81 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -40,6 +40,12 @@
</dependency>
<dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/oozie-el-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/pom.xml b/oozie-el-extensions/pom.xml
index 4c76c89..06dbec4 100644
--- a/oozie-el-extensions/pom.xml
+++ b/oozie-el-extensions/pom.xml
@@ -67,6 +67,12 @@
</dependency>
<dependency>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>