You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sh...@apache.org on 2014/01/22 11:23:34 UTC
[3/3] git commit: FALCON-123 Improve build speeds in falcon.
Contributed by Srikanth Sundarrajan
FALCON-123 Improve build speeds in falcon. Contributed by Srikanth Sundarrajan
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/d1642bea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/d1642bea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/d1642bea
Branch: refs/heads/master
Commit: d1642beab5d5073dd57c43137f0b7b22cb87ecf9
Parents: d555dd5
Author: Shwetha GS <sh...@gmail.com>
Authored: Wed Jan 22 15:53:21 2014 +0530
Committer: Shwetha GS <sh...@gmail.com>
Committed: Wed Jan 22 15:53:21 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
acquisition/pom.xml | 7 +
archival/pom.xml | 7 +
build-tools/pom.xml | 8 +
.../java/org/apache/falcon/cli/FalconCLI.java | 11 +-
.../falcon/cleanup/LogCleanupServiceTest.java | 35 +-
.../falcon/entity/FileSystemStorageTest.java | 26 +-
common/src/test/resources/runtime.properties | 25 ++
.../falcon/converter/OozieFeedMapperTest.java | 3 +-
hadoop-dependencies/pom.xml | 52 +++
.../apache/falcon/hadoop/JailedFileSystem.java | 195 +++++++++++
...op.mapreduce.protocol.ClientProtocolProvider | 14 +
.../mapred/ClassicClientProtocolProvider.java | 53 +++
hadoop-webapp/pom.xml | 51 ++-
.../org/apache/falcon/JobTrackerService.java | 28 ++
.../falcon/listener/HadoopStartupListener.java | 95 +-----
hadoop-webapp/src/main/resources/core-site.xml | 8 +-
hadoop-webapp/src/main/resources/hive-site.xml | 2 +-
hadoop-webapp/src/main/resources/yarn-site.xml | 2 +-
.../apache/hadoop/mapred/LocalRunnerTest.java | 46 +++
.../org/apache/hadoop/mapred/LocalRunnerV1.java | 208 ++++++++++++
.../org/apache/hadoop/mapred/LocalRunnerV2.java | 242 ++++++++++++++
messaging/pom.xml | 6 +
.../messaging/FalconTopicProducerTest.java | 3 +-
.../falcon/messaging/FeedProducerTest.java | 2 +-
.../falcon/messaging/ProcessProducerTest.java | 2 +-
metrics/pom.xml | 6 +
oozie-el-extensions/pom.xml | 6 +
pom.xml | 321 +++++++++++--------
.../converter/OozieProcessMapperTest.java | 4 +-
test-util/pom.xml | 5 +
.../falcon/cluster/util/EmbeddedCluster.java | 61 ++--
test-util/src/main/resources/core-site.xml | 31 ++
webapp/pom.xml | 52 ++-
webapp/src/conf/oozie/conf/action-conf/hive.xml | 2 +-
.../conf/oozie/conf/hadoop-conf/core-site.xml | 2 +-
webapp/src/conf/oozie/conf/oozie-site.xml | 9 +
.../falcon/catalog/HiveCatalogServiceIT.java | 2 +-
.../java/org/apache/falcon/cli/FalconCLIIT.java | 42 +--
.../org/apache/falcon/cli/FalconCLISmokeIT.java | 100 ++++++
.../lifecycle/TableStorageFeedEvictorIT.java | 2 +-
.../org/apache/falcon/logging/LogMoverIT.java | 171 ----------
.../apache/falcon/logging/LogProviderIT.java | 161 ----------
.../org/apache/falcon/process/PigProcessIT.java | 6 +-
.../falcon/resource/EntityManagerJerseyIT.java | 110 ++++---
.../resource/EntityManagerJerseySmokeIT.java | 119 +++++++
.../resource/ProcessInstanceManagerIT.java | 2 +-
.../org/apache/falcon/resource/TestContext.java | 37 ++-
webapp/src/test/resources/cluster-template.xml | 4 +-
49 files changed, 1649 insertions(+), 738 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index efe0d88..0ea0245 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -29,6 +29,7 @@ Trunk (Unreleased)
OPTIMIZATIONS
+ FALCON-123 Improve build speeds in falcon. (Srikanth Sundarrajan via Shwetha GS)
BUG FIXES
FALCON-260 When a process is scheduled, the user workflow is failing with OozieClientException.
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/acquisition/pom.xml
----------------------------------------------------------------------
diff --git a/acquisition/pom.xml b/acquisition/pom.xml
index e2017dd..040f9c5 100644
--- a/acquisition/pom.xml
+++ b/acquisition/pom.xml
@@ -31,4 +31,11 @@
<name>Apache Falcon Acquisition</name>
<packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/archival/pom.xml
----------------------------------------------------------------------
diff --git a/archival/pom.xml b/archival/pom.xml
index c43e645..c9dda31 100644
--- a/archival/pom.xml
+++ b/archival/pom.xml
@@ -31,4 +31,11 @@
<name>Apache Falcon Archival</name>
<packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/build-tools/pom.xml
----------------------------------------------------------------------
diff --git a/build-tools/pom.xml b/build-tools/pom.xml
index 7a020d3..00b913f 100644
--- a/build-tools/pom.xml
+++ b/build-tools/pom.xml
@@ -30,6 +30,14 @@
<artifactId>build-tools</artifactId>
<name>Build Tools</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 01d2ced..37ccf4f 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -131,17 +131,18 @@ public class FalconCLI {
try {
CLIParser.Command command = parser.parse(args);
+ int exitValue = 0;
if (command.getName().equals(HELP_CMD)) {
parser.showHelp();
} else if (command.getName().equals(ADMIN_CMD)) {
- adminCommand(command.getCommandLine());
+ exitValue = adminCommand(command.getCommandLine());
} else if (command.getName().equals(ENTITY_CMD)) {
entityCommand(command.getCommandLine());
} else if (command.getName().equals(INSTANCE_CMD)) {
instanceCommand(command.getCommandLine());
}
- return 0;
+ return exitValue;
} catch (FalconCLIException ex) {
ERR.get().println("Error: " + ex.getMessage());
return -1;
@@ -562,7 +563,7 @@ public class FalconCLI {
return url;
}
- private void adminCommand(CommandLine commandLine) throws FalconCLIException, IOException {
+ private int adminCommand(CommandLine commandLine) throws FalconCLIException, IOException {
String result;
String falconUrl = getFalconEndpoint(commandLine);
FalconClient client = new FalconClient(falconUrl);
@@ -576,6 +577,7 @@ public class FalconCLI {
result = client.getThreadDump();
OUT.get().println(result);
}
+ int exitValue = 0;
if (optionsList.contains(STATUS_OPTION)) {
int status = 0;
try {
@@ -583,11 +585,13 @@ public class FalconCLI {
if (status != 200) {
ERR.get().println("Falcon server is not fully operational (on " + falconUrl + "). "
+ "Please check log files.");
+ exitValue = status;
} else {
OUT.get().println("Falcon server is running (on " + falconUrl + ")");
}
} catch (Exception e) {
ERR.get().println("Falcon server doesn't seem to be running on " + falconUrl);
+ exitValue = -1;
}
} else if (optionsList.contains(VERSION_OPTION)) {
result = client.getVersion();
@@ -595,6 +599,7 @@ public class FalconCLI {
} else if (optionsList.contains(HELP_CMD)) {
OUT.get().println("Falcon Help");
}
+ return exitValue;
}
private Properties getClientProperties() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
index 10a9cc0..432d06b 100644
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
@@ -29,6 +29,7 @@ import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -80,6 +81,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
this.dfsCluster = EmbeddedCluster.newCluster("testCluster");
conf = dfsCluster.getConf();
fs = dfsCluster.getFileSystem();
+ fs.delete(new Path("/"), true);
storeEntity(EntityType.CLUSTER, "testCluster");
System.setProperty("test.build.data", "target/tdfs/data" + System.currentTimeMillis());
@@ -111,6 +113,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
tfs = targetDfsCluster.getFileSystem();
+ tfs.delete(new Path("/"), true);
fs.mkdirs(feedInstanceLogPath);
fs.mkdirs(feedInstanceLogPath1);
tfs.mkdirs(feedInstanceLogPath);
@@ -120,11 +123,9 @@ public class LogCleanupServiceTest extends AbstractTestBase {
// table feed staging dir setup
initializeStagingDirs();
- createStageData(sourceStagingPath1, targetStagingPath1);
-
- Thread.sleep(61000);
-
- createStageData(sourceStagingPath2, targetStagingPath2);
+ createStageData(sourceStagingPath1, targetStagingPath1, 0);
+ createStageData(sourceStagingPath2, targetStagingPath2, 10000);
+ Thread.sleep(1000);
}
private void initializeStagingDirs() throws Exception {
@@ -147,14 +148,26 @@ public class LogCleanupServiceTest extends AbstractTestBase {
targetStagingPath2 = new Path(targetStagingDir + "/ds=2012092500/" + System.currentTimeMillis());
}
- private void createStageData(Path sourcePath, Path targetPath) throws Exception {
+ private void createStageData(Path sourcePath, Path targetPath, int offset) throws Exception {
fs.mkdirs(sourcePath);
- fs.createNewFile(new Path(sourcePath, "_metadata.xml"));
- fs.createNewFile(new Path(sourcePath, "data.txt"));
+ Path metaSource = new Path(sourcePath, "_metadata.xml");
+ Path dataSource = new Path(sourcePath, "data.txt");
+ fs.createNewFile(metaSource);
+ fs.createNewFile(dataSource);
+ FileStatus status = fs.getFileStatus(metaSource);
+ fs.setTimes(metaSource, status.getModificationTime() + offset, status.getAccessTime());
+ status = fs.getFileStatus(dataSource);
+ fs.setTimes(dataSource, status.getModificationTime() + offset, status.getAccessTime());
tfs.mkdirs(targetPath);
- tfs.createNewFile(new Path(targetPath, "_metadata.xml"));
- tfs.createNewFile(new Path(targetPath, "data.txt"));
+ Path metaTarget = new Path(targetPath, "_metadata.xml");
+ Path dataTarget = new Path(targetPath, "data.txt");
+ tfs.createNewFile(metaTarget);
+ tfs.createNewFile(dataTarget);
+ status = tfs.getFileStatus(metaTarget);
+ tfs.setTimes(metaTarget, status.getModificationTime() + offset, status.getAccessTime());
+ status = tfs.getFileStatus(dataTarget);
+ tfs.setTimes(dataTarget, status.getModificationTime() + offset, status.getAccessTime());
}
@Test
@@ -169,7 +182,7 @@ public class LogCleanupServiceTest extends AbstractTestBase {
Assert.assertTrue(fs.exists(instanceLogPath3));
}
- @Test
+ @Test (enabled = false)
public void testFeedLogs() throws IOException, FalconException, InterruptedException {
AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
index 6917472..7b48d2b 100644
--- a/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/FileSystemStorageTest.java
@@ -75,8 +75,8 @@ public class FileSystemStorageTest {
List<Location> locations = new ArrayList<Location>();
locations.add(location);
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
- Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "hdfs://localhost:41020/foo/bar");
+ FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
+ Assert.assertEquals(storage.getUriTemplate(LocationType.DATA), "jail://global:00/foo/bar");
}
@Test
@@ -106,21 +106,21 @@ public class FileSystemStorageTest {
StringBuilder expected = new StringBuilder();
expected.append(LocationType.DATA)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/data/foo/bar")
+ .append("jail://global:00/data/foo/bar")
.append(FileSystemStorage.FEED_PATH_SEP)
.append(LocationType.META)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/meta/foo/bar")
+ .append("jail://global:00/meta/foo/bar")
.append(FileSystemStorage.FEED_PATH_SEP)
.append(LocationType.STATS)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/stats/foo/bar")
+ .append("jail://global:00/stats/foo/bar")
.append(FileSystemStorage.FEED_PATH_SEP)
.append(LocationType.TMP)
.append(FileSystemStorage.LOCATION_TYPE_SEP)
- .append("hdfs://localhost:41020/tmp/foo/bar");
+ .append("jail://global:00/tmp/foo/bar");
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
Assert.assertEquals(storage.getUriTemplate(), expected.toString());
}
@@ -139,9 +139,9 @@ public class FileSystemStorageTest {
@DataProvider(name = "locationTestDataProvider")
private Object[][] createLocationTestData() {
return new Object[][] {
- {"hdfs://localhost:41020", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
- {"hdfs://localhost:41020", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
- {"hdfs://localhost:41020", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
+ {"jail://global:00", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
+ {"jail://global:00", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
+ {"jail://global:00", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
{"${nameNode}", "/localDC/rc/billing/ua2", "/localDC/rc/billing/ua2"},
{"${nameNode}", "/localDC/rc/billing/ua2/", "/localDC/rc/billing/ua2"},
{"${nameNode}", "/localDC/rc/billing/ua2//", "/localDC/rc/billing/ua2"},
@@ -169,13 +169,13 @@ public class FileSystemStorageTest {
List<Location> locations = new ArrayList<Location>();
locations.add(location);
- FileSystemStorage storage = new FileSystemStorage("hdfs://localhost:41020", locations);
+ FileSystemStorage storage = new FileSystemStorage("jail://global:00", locations);
Assert.assertTrue(storage.exists());
}
@Test
public void testIsIdentical() throws Exception {
- final String storageUrl = "hdfs://localhost:41020";
+ final String storageUrl = "jail://global:00";
final Location location1 = new Location();
location1.setPath("/foo/bar");
location1.setType(LocationType.DATA);
@@ -195,7 +195,7 @@ public class FileSystemStorageTest {
@Test
public void testIsIdenticalNegative() throws Exception {
- final String storageUrl = "hdfs://localhost:41020";
+ final String storageUrl = "jail://global:00";
final Location location1 = new Location();
location1.setPath("/foo/baz");
location1.setType(LocationType.DATA);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/common/src/test/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/test/resources/runtime.properties b/common/src/test/resources/runtime.properties
new file mode 100644
index 0000000..f76ff51
--- /dev/null
+++ b/common/src/test/resources/runtime.properties
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+*.domain=unittest
+
+unittest.log.cleanup.frequency.minutes.retention=500
+unittest.log.cleanup.frequency.hours.retention=500
+unittest.log.cleanup.frequency.days.retention=5000
+unittest.log.cleanup.frequency.months.retention=500
+
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
----------------------------------------------------------------------
diff --git a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
index 128784e..a37755b 100644
--- a/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
+++ b/feed/src/test/java/org/apache/falcon/converter/OozieFeedMapperTest.java
@@ -40,7 +40,6 @@ import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.DECISION;
import org.apache.falcon.oozie.workflow.JAVA;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -110,7 +109,7 @@ public class OozieFeedMapperTest {
if (type == EntityType.CLUSTER) {
Cluster cluster = (Cluster) entity;
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(writeEndpoint);
- FileSystem fs = new Path(writeEndpoint).getFileSystem(new Configuration());
+ FileSystem fs = new Path(writeEndpoint).getFileSystem(EmbeddedCluster.newConfiguration());
fs.create(new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/retention/ext.jar")).close();
fs.create(
new Path(ClusterHelper.getLocation(cluster, "working"), "libext/FEED/replication/ext.jar")).close();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/pom.xml b/hadoop-dependencies/pom.xml
index 225b9d9..9653af2 100644
--- a/hadoop-dependencies/pom.xml
+++ b/hadoop-dependencies/pom.xml
@@ -52,9 +52,61 @@
<artifactId>hadoop-client</artifactId>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src/versioned-src/v2/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ <executions>
+ <execution>
+ <id>copy-resources</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>copy-resources</goal>
+ </goals>
+ <configuration>
+ <outputDirectory>${project.basedir}/target/classes/META-INF/services</outputDirectory>
+ <resources>
+ <resource>
+ <directory>${project.basedir}/src/main/services</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</profile>
</profiles>
+ <dependencies>
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
<build>
<plugins>
<plugin>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
new file mode 100644
index 0000000..e12a509
--- /dev/null
+++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.hadoop;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * chroot local file system for tests.
+ */
+public class JailedFileSystem extends FileSystem {
+ private URI uri;
+ private String basePath;
+ private LocalFileSystem localFS;
+
+ public JailedFileSystem() {
+ localFS = new LocalFileSystem();
+ }
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+ setConf(conf);
+ localFS.initialize(LocalFileSystem.getDefaultUri(conf), conf);
+ String base = name.getHost();
+ if (base == null) {
+ throw new IOException("Incomplete Jail URI, no jail base: "+ name);
+ }
+ basePath = new Path(conf.get("jail.base", System.getProperty("hadoop.tmp.dir",
+ conf.get("hadoop.tmp.dir", "/tmp"))) + "/jail-fs/" + base).toUri().getPath();
+ this.uri = URI.create(name.getScheme()+"://"+name.getAuthority());
+ }
+
+ @Override
+ public URI getUri() {
+ return uri;
+ }
+
+ private Path toLocalPath(Path f) {
+ return new Path(basePath + f.toUri().getPath());
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return localFS.open(toLocalPath(f), bufferSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return localFS.create(toLocalPath(f), permission, overwrite, bufferSize,
+ replication, blockSize, progress);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return localFS.append(toLocalPath(f), bufferSize, progress);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return localFS.rename(toLocalPath(src), toLocalPath(dst));
+ }
+
+ @Override
+ public boolean delete(Path f) throws IOException {
+ return delete(toLocalPath(f), false);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return localFS.delete(toLocalPath(f), recursive);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ FileStatus[] fileStatuses = localFS.listStatus(toLocalPath(f));
+ if (fileStatuses == null || fileStatuses.length == 0) {
+ return fileStatuses;
+ } else {
+ FileStatus[] jailFileStatuses = new FileStatus[fileStatuses.length];
+ for (int index = 0; index < fileStatuses.length; index++) {
+ FileStatus status = fileStatuses[index];
+ jailFileStatuses[index] = new FileStatus(status.getLen(), status.isDir(),
+ status.getReplication(), status.getBlockSize(), status.getModificationTime(),
+ status.getAccessTime(), status.getPermission(), status.getOwner(), status.getGroup(),
+ fromLocalPath(status.getPath()).makeQualified(this));
+ }
+ return jailFileStatuses;
+ }
+ }
+
+ @Override
+ public void setWorkingDirectory(Path newDir) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path("/user/" + System.getProperty("user.name"));
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return localFS.mkdirs(toLocalPath(f), permission);
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ final byte[] md5 = DigestUtils.md5(FileUtils.readFileToByteArray(new File(toLocalPath(f).toString())));
+ return new FileChecksum() {
+
+ @Override
+ public String getAlgorithmName() {
+ return "MD5";
+ }
+
+ @Override
+ public int getLength() {
+ return md5.length;
+ }
+
+ @Override
+ public byte[] getBytes() {
+ return md5;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ }
+ };
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ FileStatus status = localFS.getFileStatus(toLocalPath(f));
+ if (status == null) {
+ return null;
+ }
+ return new FileStatus(status.getLen(), status.isDir(),
+ status.getReplication(), status.getBlockSize(), status.getModificationTime(),
+ status.getAccessTime(), status.getPermission(), status.getOwner(), status.getGroup(),
+ fromLocalPath(status.getPath()).makeQualified(this.getUri(), this.getWorkingDirectory()));
+ }
+
+ private Path fromLocalPath(Path path) {
+ String pathString = path.toUri().getPath().replaceFirst(basePath, "");
+ return new Path(pathString.isEmpty() ? "/" : pathString);
+ }
+
+ @Override
+ public void setTimes(Path p, long mtime, long atime) throws IOException {
+ super.setTimes(p, mtime, atime);
+ }
+
+ @Override
+ public void close() throws IOException {
+ localFS.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
new file mode 100644
index 0000000..d8a60f6
--- /dev/null
+++ b/hadoop-dependencies/src/main/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider
@@ -0,0 +1,14 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.hadoop.mapred.ClassicClientProtocolProvider
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
new file mode 100644
index 0000000..079eca9
--- /dev/null
+++ b/hadoop-dependencies/src/versioned-src/v2/java/org/apache/hadoop/mapred/ClassicClientProtocolProvider.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Classic protocol provider for Hadoop v2 based tests.
+ */
+public class ClassicClientProtocolProvider extends ClientProtocolProvider {
+
+ private static final String LOCALHOST = "localhost";
+
+ @Override
+ public ClientProtocol create(Configuration conf) throws IOException {
+ String framework = conf.get(MRConfig.FRAMEWORK_NAME, "unittests");
+ String tracker = conf.get("mapred.job.tracker", conf.get("yarn.resourcemanager.address", LOCALHOST));
+ if (!"unittests".equals(framework) || !tracker.startsWith(LOCALHOST)) {
+ return null;
+ }
+ return new LocalJobRunner(conf);
+ }
+
+ @Override
+ public ClientProtocol create(InetSocketAddress addr, Configuration conf) throws IOException {
+ return create(conf);
+ }
+
+ @Override
+ public void close(ClientProtocol clientProtocol) throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/pom.xml b/hadoop-webapp/pom.xml
index 7640883..e576310 100644
--- a/hadoop-webapp/pom.xml
+++ b/hadoop-webapp/pom.xml
@@ -18,7 +18,7 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -44,6 +44,29 @@
<scope>compile</scope>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.5</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src/versioned-src/v1/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<profile>
<id>hadoop-2</id>
@@ -71,6 +94,11 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-nodemanager</artifactId>
</dependency>
</dependencies>
@@ -95,6 +123,22 @@
</dependency>
<dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-test-util</artifactId>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-test</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
@@ -125,6 +169,11 @@
<groupId>org.apache.hcatalog</groupId>
<artifactId>webhcat-java-client</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java b/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java
new file mode 100644
index 0000000..fa3a84a
--- /dev/null
+++ b/hadoop-webapp/src/main/java/org/apache/falcon/JobTrackerService.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon;
+
+/**
+ * Job Tracker service interface.
+ */
+public interface JobTrackerService {
+
+ void start() throws Exception;
+
+ void stop() throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java b/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
index c2bb6f7..1468ac4 100644
--- a/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
+++ b/hadoop-webapp/src/main/java/org/apache/falcon/listener/HadoopStartupListener.java
@@ -18,22 +18,14 @@
package org.apache.falcon.listener;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
-
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-
import org.apache.activemq.broker.BrokerService;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.falcon.JobTrackerService;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Logger;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+
/**
* Listener for bootstrapping embedded hadoop cluster for integration tests.
*/
@@ -44,80 +36,27 @@ public class HadoopStartupListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
try {
- FileUtils.deleteDirectory(new File(System.getProperty("hadoop.tmp.dir")));
- final Configuration conf = new Configuration();
-
- NameNode.format(conf);
- final String[] emptyArgs = {};
- try {
- Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl");
- startHadoop2Services(conf, emptyArgs);
- } catch (ClassNotFoundException cnfe) {
- startHadoop1Services(conf, emptyArgs);
- }
+ startLocalJobRunner();
startBroker();
-
startHiveMetaStore();
} catch (Exception e) {
- e.printStackTrace();
- LOG.error("Unable to start hadoop cluster", e);
- throw new RuntimeException("Unable to start hadoop cluster", e);
+ LOG.error("Unable to start daemons", e);
+ throw new RuntimeException("Unable to start daemons", e);
}
}
- private void startHadoop1Services(Configuration conf, String[] emptyArgs)
- throws IOException, IllegalAccessException, InvocationTargetException,
- NoSuchMethodException, ClassNotFoundException, InstantiationException {
-
- NameNode.createNameNode(emptyArgs, conf);
- DataNode.createDataNode(emptyArgs, conf);
-
- JobConf jobConf = new JobConf(conf);
- // JobTracker jt = JobTracker.startTracker(jobConf);
- // jt.offerService();
- // TaskTracker tt = new TaskTracker(jobConf);
- // tt.run();
-
- Object jt = Class.forName("org.apache.hadoop.mapred.JobTracker")
- .getMethod("startTracker", JobConf.class).invoke(null, jobConf);
- startService(jt, "offerService");
-
- Object tt = Class.forName("org.apache.hadoop.mapred.TaskTracker")
- .getConstructor(JobConf.class).newInstance(jobConf);
- startService(tt, "run");
- }
-
- private void startHadoop2Services(Configuration conf, String[] emptyArgs) throws Exception {
-
- // DefaultMetricsSystem.setMiniClusterMode(true);
- // ResourceManager resourceManager = new ResourceManager(new MemStore());
- // YarnConfiguration yarnConf = new YarnConfiguration(conf);
- // resourceManager.init(yarnConf);
- // resourceManager.start();
- // NodeManager nodeManager = new NodeManager();
- // nodeManager.init(yarnConf);
- // nodeManager.start();
-
- Class.forName("org.apache.hadoop.metrics2.lib.DefaultMetricsSystem")
- .getMethod("setMiniClusterMode", boolean.class).invoke(null, true);
-
- NameNode.createNameNode(emptyArgs, conf);
- DataNode.createDataNode(emptyArgs, conf);
-
- Object memStore = instance("org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore");
- Object resourceManager = Class.forName("org.apache.hadoop.yarn.server.resourcemanager.ResourceManager")
- .getConstructor(Class.forName("org.apache.hadoop.yarn.server.resourcemanager.recovery.Store"))
- .newInstance(memStore);
- Object yarnConf = Class.forName("org.apache.hadoop.yarn.conf.YarnConfiguration")
- .getConstructor(Configuration.class).newInstance(conf);
- invoke(resourceManager, "init", Configuration.class, yarnConf);
- startService(resourceManager, "start");
-
- Object nodeManager = instance("org.apache.hadoop.yarn.server.nodemanager.NodeManager");
- invoke(nodeManager, "init", Configuration.class, yarnConf);
- startService(nodeManager, "start");
+ @SuppressWarnings("unchecked")
+ private void startLocalJobRunner() throws Exception {
+ String className = "org.apache.hadoop.mapred.LocalRunnerV1";
+ try {
+ Class<? extends JobTrackerService> runner = (Class<? extends JobTrackerService>) Class.forName(className);
+ JobTrackerService service = runner.newInstance();
+ service.start();
+ } catch (ClassNotFoundException e) {
+ LOG.warn("v1 Hadoop components not found. Assuming v2", e);
+ }
}
private void startBroker() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/resources/core-site.xml b/hadoop-webapp/src/main/resources/core-site.xml
index 484e904..615b4d9 100644
--- a/hadoop-webapp/src/main/resources/core-site.xml
+++ b/hadoop-webapp/src/main/resources/core-site.xml
@@ -25,8 +25,13 @@
</property>
<property>
+ <name>fs.jail.impl</name>
+ <value>org.apache.falcon.hadoop.JailedFileSystem</value>
+ </property>
+
+ <property>
<name>fs.default.name</name>
- <value>hdfs://localhost:41020</value>
+ <value>jail://global:00</value>
</property>
<property>
@@ -46,4 +51,5 @@
<value>${project.build.directory}/tmp-hadoop-${user.name}</value>
<description>A base for other temporary directories.</description>
</property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/resources/hive-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/resources/hive-site.xml b/hadoop-webapp/src/main/resources/hive-site.xml
index 49cda78..f03a3a6 100644
--- a/hadoop-webapp/src/main/resources/hive-site.xml
+++ b/hadoop-webapp/src/main/resources/hive-site.xml
@@ -31,7 +31,7 @@
<property>
<name>fs.default.name</name>
- <value>hdfs://localhost:41020</value>
+ <value>jail://global:00</value>
</property>
<!-- Forcing the creation of the db dir under target -->
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/main/resources/yarn-site.xml
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/main/resources/yarn-site.xml b/hadoop-webapp/src/main/resources/yarn-site.xml
index 587f1c5..658752b 100644
--- a/hadoop-webapp/src/main/resources/yarn-site.xml
+++ b/hadoop-webapp/src/main/resources/yarn-site.xml
@@ -29,7 +29,7 @@
<property>
<name>mapreduce.framework.name</name>
- <value>yarn</value>
+ <value>unittests</value>
</property>
<property>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java b/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java
new file mode 100644
index 0000000..c96bb0e
--- /dev/null
+++ b/hadoop-webapp/src/test/java/org/apache/hadoop/mapred/LocalRunnerTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.falcon.JobTrackerService;
+import org.apache.hadoop.conf.Configuration;
+import org.testng.annotations.Test;
+
+/**
+ * Test for LocalRunner.
+ */
+@Test (enabled = false)
+public class LocalRunnerTest {
+
+ @SuppressWarnings("unchecked")
+ public void testLocalRunner() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("mapred.job.tracker", "localhost:41021");
+ conf.set("mapreduce.framework.name", "unittests");
+ String hadoopProfle = System.getProperty("hadoop.profile", "1");
+ if (hadoopProfle.equals("1")) {
+ String className = "org.apache.hadoop.mapred.LocalRunnerV1";
+ Class<? extends JobTrackerService> runner =
+ (Class<? extends JobTrackerService>) Class.forName(className);
+ JobTrackerService service = runner.newInstance();
+ service.start();
+ }
+ JobClient client = new JobClient(new JobConf(conf));
+ System.out.println(client.getSystemDir());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java b/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java
new file mode 100644
index 0000000..5819cce
--- /dev/null
+++ b/hadoop-webapp/src/versioned-src/v1/java/org/apache/hadoop/mapred/LocalRunnerV1.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.falcon.JobTrackerService;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+
+/**
+ * Hosted Local Job runner.
+ * Please note that one of org.apache.hadoop.mapred.LocalRunnerV2 or
+ * org.apache.hadoop.mapred.LocalRunnerV2 is active in the project depending
+ * on the profile chosen.
+ */
+public class LocalRunnerV1 implements JobSubmissionProtocol, JobTrackerService {
+
+ private final JobSubmissionProtocol localProxy;
+ private final JobConf conf;
+ private RPC.Server server;
+
+ public LocalRunnerV1() {
+ try {
+ conf = new JobConf();
+ localProxy = new LocalJobRunner(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to initialize localRunner");
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ String[] tracker = conf.get("mapred.job.tracker", "localhost:41021").split(":");
+ server = RPC.getServer(this, tracker[0], Integer.parseInt(tracker[1]), conf);
+ server.start();
+ }
+
+ @Override
+ public void stop() throws Exception {
+ server.stop();
+ }
+
+ @Override
+ public JobID getNewJobId() throws IOException {
+ return localProxy.getNewJobId();
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobName, String jobSubmitDir, Credentials ts) throws IOException {
+ return localProxy.submitJob(jobName, jobSubmitDir, ts);
+ }
+
+ @Override
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+ return localProxy.getClusterStatus(detailed);
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ return localProxy.getQueueAdmins(queueName);
+ }
+
+ @Override
+ public void killJob(JobID jobid) throws IOException {
+ localProxy.killJob(jobid);
+ }
+
+ @Override
+ public void setJobPriority(JobID jobid, String priority) throws IOException {
+ localProxy.setJobPriority(jobid, priority);
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
+ return localProxy.killTask(taskId, shouldFail);
+ }
+
+ @Override
+ public JobProfile getJobProfile(JobID jobid) throws IOException {
+ return localProxy.getJobProfile(jobid);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobid) throws IOException {
+ return localProxy.getJobStatus(jobid);
+ }
+
+ @Override
+ public Counters getJobCounters(JobID jobid) throws IOException {
+ return localProxy.getJobCounters(jobid);
+ }
+
+ @Override
+ public TaskReport[] getMapTaskReports(JobID jobid) throws IOException {
+ return localProxy.getMapTaskReports(jobid);
+ }
+
+ @Override
+ public TaskReport[] getReduceTaskReports(JobID jobid) throws IOException {
+ return localProxy.getReduceTaskReports(jobid);
+ }
+
+ @Override
+ public TaskReport[] getCleanupTaskReports(JobID jobid) throws IOException {
+ return localProxy.getCleanupTaskReports(jobid);
+ }
+
+ @Override
+ public TaskReport[] getSetupTaskReports(JobID jobid) throws IOException {
+ return localProxy.getSetupTaskReports(jobid);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException {
+ return localProxy.getFilesystemName();
+ }
+
+ @Override
+ public JobStatus[] jobsToComplete() throws IOException {
+ return localProxy.jobsToComplete();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException {
+ return localProxy.getAllJobs();
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
+ throws IOException {
+ return localProxy.getTaskCompletionEvents(jobid, fromEventId, maxEvents);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException {
+ return localProxy.getTaskDiagnostics(taskId);
+ }
+
+ @Override
+ public String getSystemDir() {
+ return localProxy.getSystemDir();
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException {
+ return localProxy.getStagingAreaDir();
+ }
+
+ @Override
+ public JobQueueInfo[] getQueues() throws IOException {
+ return localProxy.getQueues();
+ }
+
+ @Override
+ public JobQueueInfo getQueueInfo(String queue) throws IOException {
+ return localProxy.getQueueInfo(queue);
+ }
+
+ @Override
+ public JobStatus[] getJobsFromQueue(String queue) throws IOException {
+ return localProxy.getJobsFromQueue(queue);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException {
+ return localProxy.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, InterruptedException {
+ return new Token<DelegationTokenIdentifier>(null, null, null, null);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ return localProxy.renewDelegationToken(token);
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ localProxy.cancelDelegationToken(token);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return localProxy.getProtocolVersion(protocol, clientVersion);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java
----------------------------------------------------------------------
diff --git a/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java b/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java
new file mode 100644
index 0000000..ccb1bd5
--- /dev/null
+++ b/hadoop-webapp/src/versioned-src/v2/java/org/apache/hadoop/mapred/LocalRunnerV2.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import org.apache.falcon.JobTrackerService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.QueueAclsInfo;
+import org.apache.hadoop.mapreduce.QueueInfo;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.mapreduce.TaskTrackerInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.LogParams;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+
+/**
+ * Local Job Runner for Hadoop v2.
+ * Please note that one of org.apache.hadoop.mapred.LocalRunnerV2 or
+ * org.apache.hadoop.mapred.LocalRunnerV2 is active in the project depending
+ * on the profile chosen.
+ */
+public class LocalRunnerV2 implements ClientProtocol, JobTrackerService {
+
+ private final ClientProtocol localProxy;
+ private final Configuration conf;
+ private Server server;
+
+ public LocalRunnerV2() {
+ try {
+ conf = new Configuration();
+ localProxy = new LocalJobRunner(conf);
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to initialize localRunner");
+ }
+ }
+
+ @Override
+ public void start() throws Exception {
+ server = new RPC.Builder(conf).setBindAddress("0.0.0.0").setPort(41021).setInstance(this).
+ setProtocol(ClientProtocol.class).build();
+ server.start();
+ }
+
+ public void stop() throws Exception {
+ server.stop();
+ }
+
+ @Override
+ public JobID getNewJobID() throws IOException, InterruptedException {
+ return localProxy.getNewJobID();
+ }
+
+ @Override
+ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException {
+ return localProxy.submitJob(jobId, jobSubmitDir, ts);
+ }
+
+ @Override
+ public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
+ return localProxy.getClusterMetrics();
+ }
+
+ @Override
+ public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
+ return localProxy.getJobTrackerStatus();
+ }
+
+ @Override
+ public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
+ return localProxy.getTaskTrackerExpiryInterval();
+ }
+
+ @Override
+ public AccessControlList getQueueAdmins(String queueName) throws IOException {
+ return localProxy.getQueueAdmins(queueName);
+ }
+
+ @Override
+ public void killJob(JobID jobid) throws IOException, InterruptedException {
+ localProxy.killJob(jobid);
+ }
+
+ @Override
+ public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
+ localProxy.setJobPriority(jobid, priority);
+ }
+
+ @Override
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException, InterruptedException {
+ return localProxy.killTask(taskId, shouldFail);
+ }
+
+ @Override
+ public JobStatus getJobStatus(JobID jobid) throws IOException, InterruptedException {
+ return localProxy.getJobStatus(jobid);
+ }
+
+ @Override
+ public Counters getJobCounters(JobID jobid) throws IOException, InterruptedException {
+ return localProxy.getJobCounters(jobid);
+ }
+
+ @Override
+ public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
+ return localProxy.getTaskReports(jobid, type);
+ }
+
+ @Override
+ public String getFilesystemName() throws IOException, InterruptedException {
+ return localProxy.getFilesystemName();
+ }
+
+ @Override
+ public JobStatus[] getAllJobs() throws IOException, InterruptedException {
+ return localProxy.getAllJobs();
+ }
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
+ throws IOException, InterruptedException {
+ return localProxy.getTaskCompletionEvents(jobid, fromEventId, maxEvents);
+ }
+
+ @Override
+ public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
+ return localProxy.getTaskDiagnostics(taskId);
+ }
+
+ @Override
+ public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
+ return localProxy.getActiveTrackers();
+ }
+
+ @Override
+ public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
+ return localProxy.getBlacklistedTrackers();
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ return localProxy.getSystemDir();
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ return localProxy.getStagingAreaDir();
+ }
+
+ @Override
+ public String getJobHistoryDir() throws IOException, InterruptedException {
+ return localProxy.getJobHistoryDir();
+ }
+
+ @Override
+ public QueueInfo[] getQueues() throws IOException, InterruptedException {
+ return localProxy.getQueues();
+ }
+
+ @Override
+ public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
+ return localProxy.getQueue(queueName);
+ }
+
+ @Override
+ public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
+ return localProxy.getQueueAclsForCurrentUser();
+ }
+
+ @Override
+ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
+ return localProxy.getRootQueues();
+ }
+
+ @Override
+ public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
+ return localProxy.getChildQueues(queueName);
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException, InterruptedException {
+ return localProxy.getDelegationToken(renewer);
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ return localProxy.renewDelegationToken(token);
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException, InterruptedException {
+ localProxy.cancelDelegationToken(token);
+ }
+
+ @Override
+ public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
+ throws IOException, InterruptedException {
+ return localProxy.getLogFileParams(jobID, taskAttemptID);
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
+ return localProxy.getProtocolVersion(protocol, clientVersion);
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
+ throws IOException {
+ return localProxy.getProtocolSignature(protocol, clientVersion, clientMethodsHash);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/pom.xml
----------------------------------------------------------------------
diff --git a/messaging/pom.xml b/messaging/pom.xml
index a59c1e3..9aa5347 100644
--- a/messaging/pom.xml
+++ b/messaging/pom.xml
@@ -87,6 +87,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.falcon</groupId>
+ <artifactId>falcon-hadoop-dependencies</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
index 27bea68..da126c7 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FalconTopicProducerTest.java
@@ -138,7 +138,7 @@ public class FalconTopicProducerTest {
}
private List<String> createCommonArgs() {
- List<String> args = new ArrayList<String>(Arrays.asList(
+ return new ArrayList<String>(Arrays.asList(
"-" + ARG.workflowId.getArgName(), "workflow-01-00",
"-" + ARG.runId.getArgName(), "1",
"-" + ARG.nominalTime.getArgName(), "2011-01-01-01-00",
@@ -151,7 +151,6 @@ public class FalconTopicProducerTest {
"-" + ARG.status.getArgName(), ("SUCCEEDED"),
"-" + ARG.brokerTTL.getArgName(), "10",
"-" + ARG.cluster.getArgName(), "corp"));
- return args;
}
private void testProcessMessageCreator(final List<String[]> messages,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index a1609af..e707567 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -138,7 +138,7 @@ public class FeedProducerTest {
}
};
t.start();
- Thread.sleep(1500);
+ Thread.sleep(100);
new MessageProducer().run(this.args);
t.join();
if (error != null) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 078b9c2..3a40e76 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -92,7 +92,7 @@ public class ProcessProducerTest {
}
};
t.start();
- Thread.sleep(1500);
+ Thread.sleep(100);
new MessageProducer().run(this.args);
t.join();
if (error != null) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index 1ed4f6b..f1fba81 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -40,6 +40,12 @@
</dependency>
<dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/d1642bea/oozie-el-extensions/pom.xml
----------------------------------------------------------------------
diff --git a/oozie-el-extensions/pom.xml b/oozie-el-extensions/pom.xml
index 4c76c89..06dbec4 100644
--- a/oozie-el-extensions/pom.xml
+++ b/oozie-el-extensions/pom.xml
@@ -67,6 +67,12 @@
</dependency>
<dependency>
+ <groupId>javax.servlet.jsp</groupId>
+ <artifactId>jsp-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</dependency>