You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/07/10 22:05:33 UTC
[3/8] FALCON-1 Create packaging and scripts to install and try Apache
Falcon. Contributed by Srikanth Sundarrajan
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/cli/FalconCLITest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLITest.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLITest.java
deleted file mode 100644
index 64f0171..0000000
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLITest.java
+++ /dev/null
@@ -1,572 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.cli;
-
-import junit.framework.Assert;
-import org.apache.falcon.resource.AbstractTestBase;
-import org.testng.annotations.Test;
-
-import java.io.*;
-import java.util.Map;
-
-/**
- * Test for Falcon CLI.
- *
- * todo: Refactor both the classes to move this methods to helper;
- */
-public class FalconCLITest extends AbstractTestBase {
-
- private InMemoryWriter stream = new InMemoryWriter(System.out);
- // private static final String BROKER_URL =
- // "tcp://localhost:61616?daemon=true";
- private static final boolean TEST_ENABLED = true;
-
- @Test(enabled = TEST_ENABLED)
- public void testSubmitEntityValidCommands() throws Exception {
-
- FalconCLI.OUT.set(stream);
-
- String filePath;
- Map<String, String> overlay = getUniqueOverlay();
-
- filePath = overlayParametersOverTemplate(clusterFileTemplate, overlay);
- Assert.assertEquals(
- 0,
- executeWithURL("entity -submit -type cluster -file " + filePath));
- Assert.assertEquals(stream.buffer.toString().trim(),
- "default/Submit successful (cluster) " + clusterName);
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE1, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
- Assert.assertEquals(
- stream.buffer.toString().trim(),
- "default/Submit successful (feed) "
- + overlay.get("inputFeedName"));
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE2, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
- Assert.assertEquals(
- stream.buffer.toString().trim(),
- "default/Submit successful (feed) "
- + overlay.get("outputFeedName"));
-
- filePath = overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
- Assert.assertEquals(
- 0,
- executeWithURL("entity -submit -type process -file " + filePath));
- Assert.assertEquals(
- stream.buffer.toString().trim(),
- "default/Submit successful (process) "
- + overlay.get("processName"));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testListWithEmptyConfigStore() throws Exception {
- Assert.assertEquals(
- 0,
- executeWithURL("entity -list -type process "));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testSubmitAndScheduleEntityValidCommands() throws Exception {
-
- Thread.sleep(5000);
- String filePath;
- Map<String, String> overlay = getUniqueOverlay();
-
- filePath = overlayParametersOverTemplate(clusterFileTemplate, overlay);
- Assert.assertEquals(-1,
- executeWithURL("entity -submitAndSchedule -type cluster -file "
- + filePath));
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE1, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submitAndSchedule -type feed -file "
- + filePath));
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE2, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submitAndSchedule -type feed -file "
- + filePath));
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE1, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE2, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
-
- filePath = overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submitAndSchedule -type process -file "
- + filePath));
-
- Thread.sleep(5000);
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testValidateValidCommands() throws Exception {
-
- String filePath;
- Map<String, String> overlay = getUniqueOverlay();
-
- filePath = overlayParametersOverTemplate(clusterFileTemplate, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -validate -type cluster -file "
- + filePath));
- Assert.assertEquals(
- 0,
- executeWithURL("entity -submit -type cluster -file " + filePath));
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE1, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -validate -type feed -file " + filePath));
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE2, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -validate -type feed -file " + filePath));
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
-
- filePath = overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -validate -type process -file "
- + filePath));
- Assert.assertEquals(
- 0,
- executeWithURL("entity -submit -type process -file " + filePath));
-
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testDefinitionEntityValidCommands() throws Exception {
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(0,
- executeWithURL("entity -definition -type cluster -name "
- + overlay.get("cluster")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -definition -type feed -name "
- + overlay.get("inputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -definition -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(0,
- executeWithURL("entity -definition -type process -name "
- + overlay.get("processName")));
-
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testScheduleEntityValidCommands() throws Exception {
-
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(-1,
- executeWithURL("entity -schedule -type cluster -name "
- + overlay.get("cluster")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -schedule -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type process -name "
- + overlay.get("processName")));
-
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testSuspendResumeStatusEntityValidCommands() throws Exception {
-
- Thread.sleep(5000);
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -status -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -status -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -schedule -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type process -name "
- + overlay.get("processName")));
-
- waitForProcessWFtoStart();
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -suspend -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -suspend -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -status -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -status -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -resume -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -resume -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -status -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -status -type process -name "
- + overlay.get("processName")));
-
- Thread.sleep(5000);
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testSubCommandPresence() throws Exception {
- Assert.assertEquals(-1, executeWithURL("entity -type cluster "));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testDeleteEntityValidCommands() throws Exception {
-
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(
- -1,
- executeWithURL("entity -delete -type cluster -name "
- + overlay.get("cluster")));
-
- Assert.assertEquals(
- -1,
- executeWithURL("entity -delete -type feed -name "
- + overlay.get("inputFeedName")));
-
- Assert.assertEquals(
- -1,
- executeWithURL("entity -delete -type feed -name "
- + overlay.get("outputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -delete -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -delete -type feed -name "
- + overlay.get("inputFeedName")));
-
- Assert.assertEquals(
- 0,
- executeWithURL("entity -delete -type feed -name "
- + overlay.get("outputFeedName")));
-
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testInvalidCLIEntitycommands() throws Exception {
-
- Map<String, String> overlay = getUniqueOverlay();
- overlayParametersOverTemplate(FEED_TEMPLATE1, overlay);
- Assert.assertEquals(-1,
- executeWithURL("entity -submit -type feed -name " + "name"));
-
- Assert.assertEquals(-1,
- executeWithURL("entity -schedule -type feed -file " + "name"));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testInstanceRunningAndStatusCommands() throws Exception {
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type feed -name "
- + overlay.get("outputFeedName")));
- waitForProcessWFtoStart();
-
- Assert.assertEquals(0,
- executeWithURL("instance -status -type feed -name "
- + overlay.get("outputFeedName")
- + " -start " + START_INSTANCE));
-
- Assert.assertEquals(0,
- executeWithURL("instance -running -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(0,
- executeWithURL("instance -status -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testInstanceSuspendAndResume() throws Exception {
- Thread.sleep(5000);
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type process -name "
- + overlay.get("processName")));
-
-
- Assert.assertEquals(0,
- executeWithURL("instance -suspend -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE + " -end " + START_INSTANCE));
-
- Assert.assertEquals(0,
- executeWithURL("instance -resume -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE + " -end " + START_INSTANCE));
- Thread.sleep(5000);
- }
-
- private static final String START_INSTANCE = "2012-04-20T00:00Z";
-
- @Test(enabled = TEST_ENABLED)
- public void testInstanceKillAndRerun() throws Exception {
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type process -name "
- + overlay.get("processName")));
-
- waitForProcessWFtoStart();
- Assert.assertEquals(
- 0,
- executeWithURL("instance -kill -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE + " -end " + START_INSTANCE));
-
- Assert.assertEquals(
- 0,
- executeWithURL("instance -rerun -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE + " -file "
- + createTempJobPropertiesFile()));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testContinue() throws Exception {
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type process -name "
- + overlay.get("processName")));
-
- waitForProcessWFtoStart();
- Assert.assertEquals(
- 0,
- executeWithURL("instance -kill -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE + " -end " + START_INSTANCE));
-
- Assert.assertEquals(
- 0,
- executeWithURL("instance -continue -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testInvalidCLIInstanceCommands() throws Exception {
- // no command
- Assert.assertEquals(-1, executeWithURL(" -kill -type process -name "
- + "name" + " -start 2010-01-01T01:00Z -end 2010-01-01T03:00Z"));
-
- Assert.assertEquals(-1, executeWithURL("instance -kill " + "name"
- + " -start 2010-01-01T01:00Z -end 2010-01-01T01:00Z"));
-
- Assert.assertEquals(-1,
- executeWithURL("instance -kill -type process -name " + "name"
- + " -end 2010-01-01T03:00Z"));
-
- Assert.assertEquals(-1,
- executeWithURL("instance -kill -type process -name "
- + " -start 2010-01-01T01:00Z -end 2010-01-01T03:00Z"));
-
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testFalconURL() throws Exception {
- Assert.assertEquals(-1, new FalconCLI()
- .run(("instance -status -type process -name " + "processName"
- + " -start 2010-01-01T01:00Z -end 2010-01-01T03:00Z")
- .split("\\s")));
-
- Assert.assertEquals(-1, new FalconCLI()
- .run(("instance -status -type process -name "
- + "processName -url http://unknownhost:1234/"
- + " -start 2010-01-01T01:00Z -end 2010-01-01T03:00Z")
- .split("\\s")));
-
-
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testClientProperties() throws Exception {
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(
- 0,
- new FalconCLI().run(("entity -schedule -type feed -name "
- + overlay.get("outputFeedName")).split("\\s")));
-
- Assert.assertEquals(0,
- new FalconCLI().run(("entity -schedule -type process -name "
- + overlay.get("processName")).split("\\s")));
-
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testGetVersion() throws Exception {
- Assert.assertEquals(0,
- new FalconCLI().run("admin -version".split("\\s")));
-
- Assert.assertEquals(0,
- new FalconCLI().run("admin -stack".split("\\s")));
- }
-
- @Test(enabled = TEST_ENABLED)
- public void testInstanceGetLogs() throws Exception {
- Map<String, String> overlay = getUniqueOverlay();
- submitTestFiles(overlay);
-
- Assert.assertEquals(0,
- executeWithURL("entity -schedule -type process -name "
- + overlay.get("processName")));
-
- Assert.assertEquals(0,
- executeWithURL("instance -logs -type process -name "
- + overlay.get("processName")
- + " -start " + START_INSTANCE + " -end " + START_INSTANCE));
-
- }
-
-
- private int executeWithURL(String command) throws Exception {
- return new FalconCLI()
- .run((command + " -url " + BASE_URL).split("\\s+"));
- }
-
- private String createTempJobPropertiesFile() throws IOException {
- File target = new File("webapp/target");
- if (!target.exists()) {
- target = new File("target");
- }
- File tmpFile = File.createTempFile("job", ".properties", target);
- OutputStream out = new FileOutputStream(tmpFile);
- out.write("oozie.wf.rerun.failnodes=true\n".getBytes());
- out.close();
- return tmpFile.getAbsolutePath();
- }
-
- public void submitTestFiles(Map<String, String> overlay) throws Exception {
-
- String filePath = overlayParametersOverTemplate(clusterFileTemplate,
- overlay);
- Assert.assertEquals(
- 0,
- executeWithURL("entity -submit -type cluster -file " + filePath));
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE1, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
-
- filePath = overlayParametersOverTemplate(FEED_TEMPLATE2, overlay);
- Assert.assertEquals(0,
- executeWithURL("entity -submit -type feed -file " + filePath));
-
- filePath = overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
- Assert.assertEquals(
- 0,
- executeWithURL("entity -submit -type process -file " + filePath));
- }
-
- private static class InMemoryWriter extends PrintStream {
-
- private StringBuffer buffer = new StringBuffer();
-
- public InMemoryWriter(OutputStream out) {
- super(out);
- }
-
- @Override
- public void println(String x) {
- clear();
- buffer.append(x);
- super.println(x);
- }
-
- public String getBuffer() {
- return buffer.toString();
- }
-
- public void clear() {
- buffer.delete(0, buffer.length());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java b/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
new file mode 100644
index 0000000..ae4b5ad
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/logging/LogMoverIT.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.logging;
+
+import junit.framework.Assert;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.cluster.util.StandAloneCluster;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.resource.TestContext;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowJob;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test for LogMover.
+ * Requires Oozie to be running on localhost.
+ */
+@Test
+public class LogMoverIT {
+
+ private static final ConfigurationStore STORE = ConfigurationStore.get();
+ private static final String PROCESS_NAME = "testProcess" + System.currentTimeMillis();
+ private static EmbeddedCluster testCluster = null;
+ private static Process testProcess = null;
+ private static FileSystem fs;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ Map<String, String> overlay = new HashMap<String, String>();
+ overlay.put("cluster", "testCluster");
+ TestContext context = new TestContext();
+ String file = context.
+ overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay);
+ testCluster = StandAloneCluster.newCluster(file);
+ STORE.publish(EntityType.CLUSTER, testCluster.getCluster());
+/*
+ new File("target/libs").mkdirs();
+ StartupProperties.get().setProperty("system.lib.location", "target/libs");
+ SharedLibraryHostingService listener = new SharedLibraryHostingService();
+ listener.onAdd(testCluster.getCluster());
+*/
+ fs = FileSystem.get(testCluster.getConf());
+ fs.mkdirs(new Path("/workflow/lib"));
+
+ fs.copyFromLocalFile(
+ new Path(LogMoverIT.class.getResource(
+ "/org/apache/falcon/logging/workflow.xml").toURI()),
+ new Path("/workflow"));
+ fs.copyFromLocalFile(
+ new Path(LogMoverIT.class.getResource(
+ "/org/apache/falcon/logging/java-test.jar").toURI()),
+ new Path("/workflow/lib"));
+
+ testProcess = new ProcessEntityParser().parse(LogMoverIT.class
+ .getResourceAsStream("/org/apache/falcon/logging/process.xml"));
+ testProcess.setName(PROCESS_NAME);
+ }
+
+ @AfterClass
+ public void tearDown() {
+ testCluster.shutdown();
+ }
+
+ @Test (enabled = false)
+ public void testLogMover() throws Exception {
+ CurrentUser.authenticate(System.getProperty("user.name"));
+ OozieWorkflowEngine engine = new OozieWorkflowEngine();
+ String path = StartupProperties.get().getProperty("system.lib.location");
+ if (!new File("target/libs").exists()) {
+ Assert.assertTrue(new File("target/libs").mkdirs());
+ }
+ StartupProperties.get().setProperty("system.lib.location", "target/libs");
+ engine.schedule(testProcess);
+ StartupProperties.get().setProperty("system.lib.location", path);
+
+ OozieClient client = new OozieClient(
+ ClusterHelper.getOozieUrl(testCluster.getCluster()));
+ List<WorkflowJob> jobs;
+ while (true) {
+ jobs = client.getJobsInfo(OozieClient.FILTER_NAME + "="
+ + "FALCON_PROCESS_DEFAULT_" + PROCESS_NAME);
+ if (jobs.size() > 0) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+
+ WorkflowJob job = jobs.get(0);
+ while (true) {
+ if (!(job.getStatus() == WorkflowJob.Status.RUNNING || job
+ .getStatus() == WorkflowJob.Status.PREP)) {
+ break;
+ } else {
+ Thread.sleep(1000);
+ job = client.getJobInfo(job.getId());
+ }
+ }
+
+ Path oozieLogPath = new Path(getLogPath(),
+ "job-2010-01-01-01-00/000/oozie.log");
+ Assert.assertTrue(fs.exists(oozieLogPath));
+
+ testLogMoverWithNextRunId(job.getId());
+
+ }
+
+ private Path getLogPath() throws FalconException {
+ Path stagingPath = new Path(ClusterHelper.getLocation(
+ testCluster.getCluster(), "staging"),
+ EntityUtil.getStagingPath(testProcess) + "/../logs");
+ return new Path(ClusterHelper.getStorageUrl(testCluster
+ .getCluster()), stagingPath);
+ }
+
+ private void testLogMoverWithNextRunId(String jobId) throws Exception {
+ LogMover.main(new String[]{"-workflowEngineUrl",
+ ClusterHelper.getOozieUrl(testCluster.getCluster()),
+ "-subflowId", jobId + "@user-workflow", "-runId", "1",
+ "-logDir", getLogPath().toString() + "/job-2010-01-01-01-00",
+ "-status", "SUCCEEDED", "-entityType", "process", });
+
+ Path oozieLogPath = new Path(getLogPath(),
+ "job-2010-01-01-01-00/001/oozie.log");
+ Assert.assertTrue(fs.exists(oozieLogPath));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/logging/LogMoverTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogMoverTest.java b/webapp/src/test/java/org/apache/falcon/logging/LogMoverTest.java
deleted file mode 100644
index bd6dbe2..0000000
--- a/webapp/src/test/java/org/apache/falcon/logging/LogMoverTest.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.logging;
-
-import junit.framework.Assert;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.ClusterHelper;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.parser.ProcessEntityParser;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.falcon.service.SharedLibraryHostingService;
-import org.apache.falcon.workflow.engine.OozieWorkflowEngine;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowJob;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Test for LogMover.
- * Requires Oozie to be running on localhost.
- */
-public class LogMoverTest {
-
- private static final ConfigurationStore STORE = ConfigurationStore.get();
- private static final String PROCESS_NAME = "testProcess" + System.currentTimeMillis();
- private static EmbeddedCluster testCluster = null;
- private static Process testProcess = null;
- private static FileSystem fs;
-
- @BeforeClass
- public void setup() throws Exception {
- cleanupStore();
- testCluster = EmbeddedCluster.newCluster("testCluster", true);
- STORE.publish(EntityType.CLUSTER, testCluster.getCluster());
- SharedLibraryHostingService listener = new SharedLibraryHostingService();
- listener.onAdd(testCluster.getCluster());
- fs = FileSystem.get(testCluster.getConf());
- fs.mkdirs(new Path("/workflow/lib"));
-
- fs.copyFromLocalFile(
- new Path(LogMoverTest.class.getResource(
- "/org/apache/falcon/logging/workflow.xml").toURI()),
- new Path("/workflow"));
- fs.copyFromLocalFile(
- new Path(LogMoverTest.class.getResource(
- "/org/apache/falcon/logging/java-test.jar").toURI()),
- new Path("/workflow/lib"));
-
- testProcess = new ProcessEntityParser().parse(LogMoverTest.class
- .getResourceAsStream("/org/apache/falcon/logging/process.xml"));
- testProcess.setName(PROCESS_NAME);
- STORE.publish(EntityType.PROCESS, testProcess);
- }
-
- @AfterClass
- public void tearDown() {
- testCluster.shutdown();
- }
-
- private void cleanupStore() throws FalconException {
- for (EntityType type : EntityType.values()) {
- Collection<String> entities = STORE.getEntities(type);
- for (String entity : entities) {
- STORE.remove(type, entity);
- }
- }
- }
-
- @Test
- public void testLogMover() throws Exception {
- CurrentUser.authenticate(System.getProperty("user.name"));
- OozieWorkflowEngine engine = new OozieWorkflowEngine();
- engine.schedule(testProcess);
-
- OozieClient client = new OozieClient(
- ClusterHelper.getOozieUrl(testCluster.getCluster()));
- List<WorkflowJob> jobs;
- while (true) {
- jobs = client.getJobsInfo(OozieClient.FILTER_NAME + "="
- + "FALCON_PROCESS_DEFAULT_" + PROCESS_NAME);
- if (jobs.size() > 0) {
- break;
- } else {
- Thread.sleep(100);
- }
- }
-
- WorkflowJob job = jobs.get(0);
- while (true) {
- if (!(job.getStatus() == WorkflowJob.Status.RUNNING || job
- .getStatus() == WorkflowJob.Status.PREP)) {
- break;
- } else {
- Thread.sleep(100);
- job = client.getJobInfo(job.getId());
- }
- }
-
- Path oozieLogPath = new Path(getLogPath(),
- "job-2010-01-01-01-00/000/oozie.log");
- Assert.assertTrue(fs.exists(oozieLogPath));
-
- testLogMoverWithNextRunId(job.getId());
-
- }
-
- private Path getLogPath() throws FalconException {
- Path stagingPath = new Path(ClusterHelper.getLocation(
- testCluster.getCluster(), "staging"),
- EntityUtil.getStagingPath(testProcess) + "/../logs");
- Path logPath = new Path(ClusterHelper.getStorageUrl(testCluster
- .getCluster()), stagingPath);
- return logPath;
- }
-
- private void testLogMoverWithNextRunId(String jobId) throws Exception {
- LogMover.main(new String[]{"-workflowEngineUrl",
- ClusterHelper.getOozieUrl(testCluster.getCluster()),
- "-subflowId", jobId + "@user-workflow", "-runId", "1",
- "-logDir", getLogPath().toString() + "/job-2010-01-01-01-00",
- "-status", "SUCCEEDED", "-entityType", "process", });
-
- Path oozieLogPath = new Path(getLogPath(),
- "job-2010-01-01-01-00/001/oozie.log");
- Assert.assertTrue(fs.exists(oozieLogPath));
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java b/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java
new file mode 100644
index 0000000..25e0979
--- /dev/null
+++ b/webapp/src/test/java/org/apache/falcon/logging/LogProviderIT.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.falcon.logging;
+
+import org.apache.falcon.FalconException;
+import org.apache.falcon.cluster.util.EmbeddedCluster;
+import org.apache.falcon.cluster.util.StandAloneCluster;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.resource.InstancesResult.Instance;
+import org.apache.falcon.resource.InstancesResult.InstanceAction;
+import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
+import org.apache.falcon.resource.TestContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test for LogProvider.
+ */
+public class LogProviderIT {
+
+ private static final ConfigurationStore STORE = ConfigurationStore.get();
+ private static EmbeddedCluster testCluster = null;
+ private static Process testProcess = null;
+ private static final String PROCESS_NAME = "testProcess";
+ private static FileSystem fs;
+ private Instance instance;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ Map<String, String> overlay = new HashMap<String, String>();
+ overlay.put("cluster", "logProviderTest");
+ TestContext context = new TestContext();
+ String file = context.
+ overlayParametersOverTemplate(context.CLUSTER_TEMPLATE, overlay);
+ testCluster = StandAloneCluster.newCluster(file);
+ cleanupStore();
+ STORE.publish(EntityType.CLUSTER, testCluster.getCluster());
+ fs = FileSystem.get(testCluster.getConf());
+ Path instanceLogPath = new Path(
+ "/projects/falcon/staging/falcon/workflows/process/" + PROCESS_NAME
+ + "/logs/job-2010-01-01-01-00/000");
+ fs.mkdirs(instanceLogPath);
+ fs.createNewFile(new Path(instanceLogPath, "oozie.log"));
+ fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
+ fs.createNewFile(new Path(instanceLogPath, "mr_Action_FAILED.log"));
+ fs.createNewFile(new Path(instanceLogPath, "mr_Action2_SUCCEEDED.log"));
+
+ fs.mkdirs(new Path("/projects/falcon/staging/falcon/workflows/process/"
+ + PROCESS_NAME + "/logs/job-2010-01-01-01-00/001"));
+ fs.mkdirs(new Path("/projects/falcon/staging/falcon/workflows/process/"
+ + PROCESS_NAME + "/logs/job-2010-01-01-01-00/002"));
+ Path run3 = new Path("/projects/falcon/staging/falcon/workflows/process/"
+ + PROCESS_NAME + "/logs/job-2010-01-01-01-00/003");
+ fs.mkdirs(run3);
+ fs.createNewFile(new Path(run3, "oozie.log"));
+
+ testProcess = new ProcessEntityParser().parse(LogProviderIT.class
+ .getResourceAsStream("/org/apache/falcon/logging/process.xml"));
+ testProcess.setName(PROCESS_NAME);
+ STORE.publish(EntityType.PROCESS, testProcess);
+ }
+
+ @BeforeMethod
+ public void setInstance() {
+ instance = new Instance();
+ instance.status = WorkflowStatus.SUCCEEDED;
+ instance.instance = "2010-01-01T01:00Z";
+ instance.cluster = "logProviderTest";
+ instance.logFile = "http://localhost:41000/oozie/wflog";
+ }
+
+ private void cleanupStore() throws FalconException {
+ for (EntityType type : EntityType.values()) {
+ Collection<String> entities = STORE.getEntities(type);
+ for (String entity : entities) {
+ STORE.remove(type, entity);
+ }
+ }
+ }
+
+ @Test
+ public void testLogProviderWithValidRunId() throws FalconException {
+ LogProvider provider = new LogProvider();
+ Instance instanceWithLog = provider.populateLogUrls(testProcess,
+ instance, "0");
+ Assert.assertEquals(
+ instance.logFile,
+ "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
+ + "job-2010-01-01-01-00/000/oozie.log");
+
+ InstanceAction action = instanceWithLog.actions[0];
+ Assert.assertEquals(action.action, "mr_Action2");
+ Assert.assertEquals(action.status, "SUCCEEDED");
+ Assert.assertEquals(
+ action.logFile,
+ "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
+ + "job-2010-01-01-01-00/000/mr_Action2_SUCCEEDED.log");
+
+ action = instanceWithLog.actions[1];
+ Assert.assertEquals(action.action, "mr_Action");
+ Assert.assertEquals(action.status, "FAILED");
+ Assert.assertEquals(
+ action.logFile,
+ "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
+ + "job-2010-01-01-01-00/000/mr_Action_FAILED.log");
+ }
+
+ @Test
+ public void testLogProviderWithInvalidRunId() throws FalconException {
+ LogProvider provider = new LogProvider();
+ provider.populateLogUrls(testProcess, instance, "x");
+ Assert.assertEquals(instance.logFile,
+ "http://localhost:41000/oozie/wflog");
+ }
+
+ @Test
+ public void testLogProviderWithUnavailableRunId() throws FalconException {
+ LogProvider provider = new LogProvider();
+ instance.logFile = null;
+ provider.populateLogUrls(testProcess, instance, "7");
+ Assert.assertEquals(instance.logFile, "-");
+ }
+
+ @Test
+ public void testLogProviderWithEmptyRunId() throws FalconException {
+ LogProvider provider = new LogProvider();
+ instance.logFile = null;
+ provider.populateLogUrls(testProcess, instance, null);
+ Assert.assertEquals(
+ instance.logFile,
+ "http://localhost:50070/data/projects/falcon/staging/falcon/workflows/process/testProcess/logs/"
+ + "job-2010-01-01-01-00/003/oozie.log");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/logging/LogProviderTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/logging/LogProviderTest.java b/webapp/src/test/java/org/apache/falcon/logging/LogProviderTest.java
deleted file mode 100644
index a3ebaf0..0000000
--- a/webapp/src/test/java/org/apache/falcon/logging/LogProviderTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.logging;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.parser.ProcessEntityParser;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.resource.InstancesResult.Instance;
-import org.apache.falcon.resource.InstancesResult.InstanceAction;
-import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.Collection;
-
-/**
- * Test for LogProvider.
- */
-public class LogProviderTest {
-
- private static final ConfigurationStore STORE = ConfigurationStore.get();
- private static EmbeddedCluster testCluster = null;
- private static Process testProcess = null;
- private static final String PROCESS_NAME = "testProcess";
- private static FileSystem fs;
- private Instance instance;
-
- @BeforeClass
- public void setup() throws Exception {
- testCluster = EmbeddedCluster.newCluster("testCluster", false);
- cleanupStore();
- STORE.publish(EntityType.CLUSTER, testCluster.getCluster());
- fs = FileSystem.get(testCluster.getConf());
- Path instanceLogPath = new Path(
- "/workflow/staging/falcon/workflows/process/" + PROCESS_NAME
- + "/logs/job-2010-01-01-01-00/000");
- fs.mkdirs(instanceLogPath);
- fs.createNewFile(new Path(instanceLogPath, "oozie.log"));
- fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
- fs.createNewFile(new Path(instanceLogPath, "mr_Action_FAILED.log"));
- fs.createNewFile(new Path(instanceLogPath, "mr_Action2_SUCCEEDED.log"));
-
- fs.mkdirs(new Path("/workflow/staging/falcon/workflows/process/"
- + PROCESS_NAME + "/logs/job-2010-01-01-01-00/001"));
- fs.mkdirs(new Path("/workflow/staging/falcon/workflows/process/"
- + PROCESS_NAME + "/logs/job-2010-01-01-01-00/002"));
- Path run3 = new Path("/workflow/staging/falcon/workflows/process/"
- + PROCESS_NAME + "/logs/job-2010-01-01-01-00/003");
- fs.mkdirs(run3);
- fs.createNewFile(new Path(run3, "oozie.log"));
-
- testProcess = new ProcessEntityParser().parse(LogMoverTest.class
- .getResourceAsStream("/org/apache/falcon/logging/process.xml"));
- testProcess.setName(PROCESS_NAME);
- STORE.publish(EntityType.PROCESS, testProcess);
- }
-
- @BeforeMethod
- public void setInstance() {
- instance = new Instance();
- instance.status = WorkflowStatus.SUCCEEDED;
- instance.instance = "2010-01-01T01:00Z";
- instance.cluster = "testCluster";
- instance.logFile = "http://localhost:15000/oozie/wflog";
- }
-
- private void cleanupStore() throws FalconException {
- for (EntityType type : EntityType.values()) {
- Collection<String> entities = STORE.getEntities(type);
- for (String entity : entities) {
- STORE.remove(type, entity);
- }
- }
- }
-
- @Test
- public void testLogProviderWithValidRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- Instance instanceWithLog = provider.populateLogUrls(testProcess,
- instance, "0");
- Assert.assertEquals(
- instance.logFile,
- "http://localhost:50070/data/workflow/staging/falcon/workflows/process/testProcess/logs/job-2010-01"
- + "-01-01-00/000/oozie.log");
-
- InstanceAction action = instanceWithLog.actions[0];
- Assert.assertEquals(action.action, "mr_Action2");
- Assert.assertEquals(action.status, "SUCCEEDED");
- Assert.assertEquals(
- action.logFile,
- "http://localhost:50070/data/workflow/staging/falcon/workflows/process/testProcess/logs/job-2010-01"
- + "-01-01-00/000/mr_Action2_SUCCEEDED.log");
-
- action = instanceWithLog.actions[1];
- Assert.assertEquals(action.action, "mr_Action");
- Assert.assertEquals(action.status, "FAILED");
- Assert.assertEquals(
- action.logFile,
- "http://localhost:50070/data/workflow/staging/falcon/workflows/process/testProcess/logs/job-2010-01"
- + "-01-01-00/000/mr_Action_FAILED.log");
- }
-
- @Test
- public void testLogProviderWithInvalidRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- provider.populateLogUrls(testProcess, instance, "x");
- Assert.assertEquals(instance.logFile,
- "http://localhost:15000/oozie/wflog");
- }
-
- @Test
- public void testLogProviderWithUnavailableRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- instance.logFile = null;
- provider.populateLogUrls(testProcess, instance, "7");
- Assert.assertEquals(instance.logFile, "-");
- }
-
- @Test
- public void testLogProviderWithEmptyRunId() throws FalconException {
- LogProvider provider = new LogProvider();
- instance.logFile = null;
- provider.populateLogUrls(testProcess, instance, null);
- Assert.assertEquals(
- instance.logFile,
- "http://localhost:50070/data/workflow/staging/falcon/workflows/process/testProcess/logs/"
- + "job-2010-01-01-01-00/003/oozie.log");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/61417357/webapp/src/test/java/org/apache/falcon/resource/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestBase.java b/webapp/src/test/java/org/apache/falcon/resource/AbstractTestBase.java
deleted file mode 100644
index 3ddd282..0000000
--- a/webapp/src/test/java/org/apache/falcon/resource/AbstractTestBase.java
+++ /dev/null
@@ -1,410 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.resource;
-
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.cluster.util.StandAloneCluster;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.util.EmbeddedServer;
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.workflow.engine.OozieClientFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.oozie.client.*;
-import org.apache.oozie.client.Job.Status;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeClass;
-
-import javax.servlet.ServletInputStream;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.UriBuilder;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import java.io.*;
-import java.util.*;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Base test class for CLI, Entity and Process Instances.
- */
-public class AbstractTestBase {
- protected static final String FEED_TEMPLATE1 = "/feed-template1.xml";
- protected static final String FEED_TEMPLATE2 = "/feed-template2.xml";
-
- protected String clusterFileTemplate = "/cluster-template.xml";
-
- protected static final String SAMPLE_PROCESS_XML = "/process-version-0.xml";
- protected static final String PROCESS_TEMPLATE = "/process-template.xml";
-
- protected static final String BASE_URL = "http://localhost:15000/";
- protected static final String REMOTE_USER = System.getProperty("user.name");
-
- protected EmbeddedServer server;
-
- protected Unmarshaller unmarshaller;
- protected Marshaller marshaller;
-
- protected EmbeddedCluster cluster;
- protected WebResource service = null;
- protected String clusterName;
- protected String processName;
- protected String outputFeedName;
-
- private static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_]*##");
-
- protected void scheduleProcess(String processTemplate, Map<String, String> overlay) throws Exception {
- ClientResponse response = submitToFalcon(clusterFileTemplate, overlay, EntityType.CLUSTER);
- assertSuccessful(response);
-
- response = submitToFalcon(FEED_TEMPLATE1, overlay, EntityType.FEED);
- assertSuccessful(response);
-
- response = submitToFalcon(FEED_TEMPLATE2, overlay, EntityType.FEED);
- assertSuccessful(response);
-
- response = submitToFalcon(processTemplate, overlay, EntityType.PROCESS);
- assertSuccessful(response);
- ClientResponse clientRepsonse = this.service.path("api/entities/schedule/process/" + processName)
- .header("Remote-User", REMOTE_USER).accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(
- ClientResponse.class);
- assertSuccessful(clientRepsonse);
- }
-
- protected void scheduleProcess() throws Exception {
- scheduleProcess(PROCESS_TEMPLATE, getUniqueOverlay());
- }
-
- private List<WorkflowJob> getRunningJobs(String entityName) throws Exception {
- OozieClient ozClient = OozieClientFactory.get(
- (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName));
- StringBuilder builder = new StringBuilder();
- builder.append(OozieClient.FILTER_STATUS).append('=').append(Job.Status.RUNNING).append(';');
- builder.append(OozieClient.FILTER_NAME).append('=').append("FALCON_PROCESS_DEFAULT_").append(entityName);
- return ozClient.getJobsInfo(builder.toString());
- }
-
- protected void waitForWorkflowStart(String entityName) throws Exception {
- for (int i = 0; i < 10; i++) {
- List<WorkflowJob> jobs = getRunningJobs(entityName);
- if (jobs != null && !jobs.isEmpty()) {
- return;
- }
-
- System.out.println("Waiting for workflow to start");
- Thread.sleep(i * 1000);
- }
- throw new Exception("Workflow for " + entityName + " hasn't started in oozie");
- }
-
- protected void waitForProcessWFtoStart() throws Exception {
- waitForWorkflowStart(processName);
- }
-
- protected void waitForOutputFeedWFtoStart() throws Exception {
- waitForWorkflowStart(outputFeedName);
- }
-
- protected void waitForBundleStart(Status status) throws Exception {
- OozieClient ozClient = OozieClientFactory.get(clusterName);
- List<BundleJob> bundles = getBundles();
- if (bundles.isEmpty()) {
- return;
- }
-
- String bundleId = bundles.get(0).getId();
- for (int i = 0; i < 15; i++) {
- Thread.sleep(i * 1000);
- BundleJob bundle = ozClient.getBundleJobInfo(bundleId);
- if (bundle.getStatus() == status) {
- if (status == Status.FAILED) {
- return;
- }
-
- boolean done = false;
- for (CoordinatorJob coord : bundle.getCoordinators()) {
- if (coord.getStatus() == status) {
- done = true;
- }
- }
- if (done) {
- return;
- }
- }
- System.out.println("Waiting for bundle " + bundleId + " in " + status + " state");
- }
- throw new Exception("Bundle " + bundleId + " is not " + status + " in oozie");
- }
-
- public AbstractTestBase() {
- try {
- JAXBContext jaxbContext = JAXBContext.newInstance(APIResult.class, Feed.class, Process.class, Cluster.class,
- InstancesResult.class);
- unmarshaller = jaxbContext.createUnmarshaller();
- marshaller = jaxbContext.createMarshaller();
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @BeforeClass
- public void configure() throws Exception {
- StartupProperties.get().setProperty(
- "application.services",
- StartupProperties.get().getProperty("application.services")
- .replace("org.apache.falcon.service.ProcessSubscriberService", ""));
- String store = StartupProperties.get().getProperty("config.store.uri");
- StartupProperties.get().setProperty("config.store.uri", store + System.currentTimeMillis());
- if (new File("webapp/src/main/webapp").exists()) {
- this.server = new EmbeddedServer(15000, "webapp/src/main/webapp");
- } else if (new File("src/main/webapp").exists()) {
- this.server = new EmbeddedServer(15000, "src/main/webapp");
- } else {
- throw new RuntimeException("Cannot run jersey tests");
- }
- ClientConfig config = new DefaultClientConfig();
- Client client = Client.create(config);
- this.service = client.resource(UriBuilder.fromUri(BASE_URL).build());
- this.server.start();
-
- if (System.getProperty("falcon.test.hadoop.embedded", "true").equals("true")) {
- clusterFileTemplate = "target/cluster-template.xml";
- this.cluster = EmbeddedCluster.newCluster("##cluster##", true);
- Cluster clusterEntity = this.cluster.getCluster();
- FileOutputStream out = new FileOutputStream(clusterFileTemplate);
- marshaller.marshal(clusterEntity, out);
- out.close();
- } else {
- Map<String, String> overlay = new HashMap<String, String>();
- overlay.put("cluster", RandomStringUtils.randomAlphabetic(5));
- String file = overlayParametersOverTemplate(clusterFileTemplate, overlay);
- this.cluster = StandAloneCluster.newCluster(file);
- clusterName = cluster.getCluster().getName();
- }
-
- cleanupStore();
-
- // setup dependent workflow and lipath in hdfs
- FileSystem fs = FileSystem.get(this.cluster.getConf());
- fs.mkdirs(new Path("/falcon"), new FsPermission((short) 511));
-
- Path wfParent = new Path("/falcon/test");
- fs.delete(wfParent, true);
- Path wfPath = new Path(wfParent, "workflow");
- fs.mkdirs(wfPath);
- fs.copyFromLocalFile(false, true, new Path(this.getClass().getResource("/fs-workflow.xml").getPath()),
- new Path(wfPath,
- "workflow.xml"));
- fs.mkdirs(new Path(wfParent, "input/2012/04/20/00"));
- Path outPath = new Path(wfParent, "output");
- fs.mkdirs(outPath);
- fs.setPermission(outPath, new FsPermission((short) 511));
- }
-
- /**
- * Converts a InputStream into ServletInputStream.
- *
- * @param fileName
- * @return ServletInputStream
- * @throws java.io.IOException
- */
- protected ServletInputStream getServletInputStream(String fileName) throws IOException {
- return getServletInputStream(new FileInputStream(fileName));
- }
-
- protected ServletInputStream getServletInputStream(final InputStream stream) {
- return new ServletInputStream() {
-
- @Override
- public int read() throws IOException {
- return stream.read();
- }
- };
- }
-
- public void tearDown() throws Exception {
- this.cluster.shutdown();
- server.stop();
- }
-
- public void cleanupStore() throws Exception {
- for (EntityType type : EntityType.values()) {
- for (String name : ConfigurationStore.get().getEntities(type)) {
- ConfigurationStore.get().remove(type, name);
- }
- }
- }
-
- protected ClientResponse submitAndSchedule(String template, Map<String, String> overlay, EntityType entityType)
- throws Exception {
- String tmpFile = overlayParametersOverTemplate(template, overlay);
- ServletInputStream rawlogStream = getServletInputStream(tmpFile);
-
- return this.service.path("api/entities/submitAndSchedule/" + entityType.name().toLowerCase())
- .header("Remote-User", "testuser").accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
- .post(ClientResponse.class, rawlogStream);
- }
-
- protected ClientResponse submitToFalcon(String template, Map<String, String> overlay, EntityType entityType)
- throws IOException {
- String tmpFile = overlayParametersOverTemplate(template, overlay);
- return submitFileToFalcon(entityType, tmpFile);
- }
-
- private ClientResponse submitFileToFalcon(EntityType entityType, String tmpFile) throws IOException {
-
- ServletInputStream rawlogStream = getServletInputStream(tmpFile);
-
- return this.service.path("api/entities/submit/" + entityType.name().toLowerCase()).header("Remote-User",
- "testuser")
- .accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML).post(ClientResponse.class, rawlogStream);
- }
-
- protected void assertRequestId(ClientResponse clientRepsonse) {
- String response = clientRepsonse.getEntity(String.class);
- try {
- APIResult result = (APIResult) unmarshaller.unmarshal(new StringReader(response));
- Assert.assertNotNull(result.getRequestId());
- } catch (JAXBException e) {
- Assert.fail("Reponse " + response + " is not valid");
- }
- }
-
- protected void assertStatus(ClientResponse clientRepsonse, APIResult.Status status) {
- String response = clientRepsonse.getEntity(String.class);
- try {
- APIResult result = (APIResult) unmarshaller.unmarshal(new StringReader(response));
- Assert.assertEquals(result.getStatus(), status);
- } catch (JAXBException e) {
- Assert.fail("Reponse " + response + " is not valid");
- }
- }
-
- protected void assertFailure(ClientResponse clientRepsonse) {
- Assert.assertEquals(clientRepsonse.getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
- assertStatus(clientRepsonse, APIResult.Status.FAILED);
- }
-
- protected void assertSuccessful(ClientResponse clientRepsonse) {
- Assert.assertEquals(clientRepsonse.getStatus(), Response.Status.OK.getStatusCode());
- assertStatus(clientRepsonse, APIResult.Status.SUCCEEDED);
- }
-
- protected String overlayParametersOverTemplate(String template, Map<String, String> overlay) throws IOException {
- File tmpFile = getTempFile();
- OutputStream out = new FileOutputStream(tmpFile);
-
- InputStreamReader in;
- if (getClass().getResourceAsStream(template) == null) {
- in = new FileReader(template);
- } else {
- in = new InputStreamReader(getClass().getResourceAsStream(template));
- }
- BufferedReader reader = new BufferedReader(in);
- String line;
- while ((line = reader.readLine()) != null) {
- Matcher matcher = VAR_PATTERN.matcher(line);
- while (matcher.find()) {
- String variable = line.substring(matcher.start(), matcher.end());
- line = line.replace(variable, overlay.get(variable.substring(2, variable.length() - 2)));
- matcher = VAR_PATTERN.matcher(line);
- }
- out.write(line.getBytes());
- out.write("\n".getBytes());
- }
- reader.close();
- out.close();
- return tmpFile.getAbsolutePath();
- }
-
- protected File getTempFile() throws IOException {
- File target = new File("webapp/target");
- if (!target.exists()) {
- target = new File("target");
- }
-
- return File.createTempFile("test", ".xml", target);
- }
-
- protected List<BundleJob> getBundles() throws Exception {
- List<BundleJob> bundles = new ArrayList<BundleJob>();
- if (clusterName == null) {
- return bundles;
- }
-
- OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
- return ozClient.getBundleJobsInfo("name=FALCON_PROCESS_" + processName, 0, 10);
- }
-
- @AfterClass
- public void cleanup() throws Exception {
- tearDown();
- cleanupStore();
- }
-
- @AfterMethod
- public boolean killOozieJobs() throws Exception {
- if (clusterName == null) {
- return true;
- }
-
- OozieClient ozClient = OozieClientFactory.get(cluster.getCluster());
- List<BundleJob> bundles = getBundles();
- if (bundles != null) {
- for (BundleJob bundle : bundles) {
- ozClient.kill(bundle.getId());
- }
- }
- return false;
- }
-
- protected Map<String, String> getUniqueOverlay() throws FalconException {
- Map<String, String> overlay = new HashMap<String, String>();
- long time = System.currentTimeMillis();
- clusterName = "cluster" + time;
- overlay.put("cluster", clusterName);
- overlay.put("inputFeedName", "in" + time);
- //only feeds with future dates can be scheduled
- Date endDate = new Date(System.currentTimeMillis() + 15 * 60 * 1000);
- overlay.put("feedEndDate", SchemaHelper.formatDateUTC(endDate));
- overlay.put("outputFeedName", "out" + time);
- processName = "p" + time;
- overlay.put("processName", processName);
- outputFeedName = "out" + time;
- return overlay;
- }
-}