You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ro...@apache.org on 2014/12/22 13:08:57 UTC
incubator-falcon git commit: FALCON-948 Enabling late rerun tests.
Contributed by Paul Isaychuk
Repository: incubator-falcon
Updated Branches:
refs/heads/master c682b34df -> c1ac6e6af
FALCON-948 Enabling late rerun tests. Contributed by Paul Isaychuk
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/c1ac6e6a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/c1ac6e6a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/c1ac6e6a
Branch: refs/heads/master
Commit: c1ac6e6afe23bfba8e3cd8b567bb83ee2abf1bc3
Parents: c682b34
Author: Ruslan Ostafiychuk <ro...@apache.org>
Authored: Mon Dec 22 14:08:11 2014 +0200
Committer: Ruslan Ostafiychuk <ro...@apache.org>
Committed: Mon Dec 22 14:08:11 2014 +0200
----------------------------------------------------------------------
falcon-regression/CHANGES.txt | 2 +
.../falcon/regression/core/util/HadoopUtil.java | 31 +++++++-
.../regression/core/util/InstanceUtil.java | 15 ----
.../falcon/regression/core/util/OozieUtil.java | 8 +-
.../falcon/regression/FeedLateRerunTest.java | 77 ++++----------------
.../regression/ProcessInstanceKillsTest.java | 2 +-
.../falcon/regression/ProcessLateRerunTest.java | 14 +---
7 files changed, 49 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/CHANGES.txt
----------------------------------------------------------------------
diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt
index b299b40..10795e1 100644
--- a/falcon-regression/CHANGES.txt
+++ b/falcon-regression/CHANGES.txt
@@ -184,6 +184,8 @@ Trunk (Unreleased)
FALCON-681 delete duplicate feed retention test from falcon regression (SamarthG)
BUG FIXES
+ FALCON-948 Enabling late rerun tests (Paul Isaychuk via Ruslan Ostafiychuk)
+
FALCON-956 Fix testProcessInstanceStatusTimedOut (Paul Isaychuk via Raghav Kumar Gautam)
FALCON-937 Fix tests that are still using hdfs root dir in feeds (Raghav Kumar Gautam
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
index 3cb4f94..64574a5 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
@@ -20,6 +20,8 @@ package org.apache.falcon.regression.core.util;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -49,7 +51,7 @@ public final class HadoopUtil {
/*
* Removes 'hdfs(hftp)://server:port'
*/
- private static String cutProtocol(String path) {
+ public static String cutProtocol(String path) {
if (StringUtils.isNotEmpty(path)) {
if (protocol.matcher(path).find()) {
return '/' + protocol.split(path)[1];
@@ -146,12 +148,12 @@ public final class HadoopUtil {
* @param srcFileLocation source location
* @throws IOException
*/
- public static void copyDataToFolder(final FileSystem fs, final String dstHdfsDir,
+ public static void copyDataToFolder(final FileSystem fs, String dstHdfsDir,
final String srcFileLocation)
throws IOException {
LOGGER.info(String.format("Copying local dir %s to hdfs location %s on %s",
srcFileLocation, dstHdfsDir, fs.getUri()));
- fs.copyFromLocalFile(new Path(srcFileLocation), new Path(dstHdfsDir));
+ fs.copyFromLocalFile(new Path(srcFileLocation), new Path(cutProtocol(dstHdfsDir)));
}
/**
@@ -474,4 +476,27 @@ public final class HadoopUtil {
copyDataToFolders(fs, folderPrefix, folderPaths,
OSUtil.NORMAL_INPUT + "_SUCCESS", OSUtil.NORMAL_INPUT + "log_01.txt");
}
+
+ /**
+ * Creates empty folders in hdfs.
+ * @param helper target
+ * @param folderList list of folders
+ * @throws IOException
+ * @deprecated method creates filesystem object by itself. We should pass existing FileSystem
+ * object to such methods.
+ */
+ @Deprecated
+ public static void createHDFSFolders(ColoHelper helper, List<String> folderList)
+ throws IOException {
+ LOGGER.info("creating folders.....");
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "hdfs://" + helper.getFeedHelper().getHadoopURL());
+ final FileSystem fs = FileSystem.get(conf);
+ for (final String folder : folderList) {
+ if (StringUtils.isNotEmpty(folder)) {
+ fs.mkdirs(new Path(cutProtocol(folder)));
+ }
+ }
+ LOGGER.info("created folders.....");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
index 4f9bd9d..ce9e15b 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java
@@ -568,21 +568,6 @@ public final class InstanceUtil {
return actions.get(instanceNumber).getStatus();
}
-
- public static void createHDFSFolders(ColoHelper helper, List<String> folderList)
- throws IOException {
- LOGGER.info("creating folders.....");
- Configuration conf = new Configuration();
- conf.set("fs.default.name", "hdfs://" + helper.getFeedHelper().getHadoopURL());
- final FileSystem fs = FileSystem.get(conf);
- for (final String folder : folderList) {
- if (StringUtils.isNotEmpty(folder)) {
- fs.mkdirs(new Path(folder));
- }
- }
- LOGGER.info("created folders.....");
- }
-
/**
* Retrieves replication coordinatorID from bundle of coordinators.
*/
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
index 95919ee..3098729 100644
--- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/OozieUtil.java
@@ -466,8 +466,8 @@ public final class OozieUtil {
String bundleID = InstanceUtil.getSequenceBundleID(helper, entityName, type, bundleNumber);
OozieClient oozieClient = helper.getClusterHelper().getOozieClient();
List<CoordinatorJob> coords = oozieClient.getBundleJobInfo(bundleID).getCoordinators();
- InstanceUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords,
- instanceNumber));
+ HadoopUtil.createHDFSFolders(helper, getMissingDependenciesForInstance(oozieClient, coords,
+ instanceNumber));
}
private static List<String> getMissingDependenciesForInstance(OozieClient oozieClient,
@@ -501,8 +501,8 @@ public final class OozieUtil {
for (int instanceNumber = 0; instanceNumber < temp.getActions().size();
instanceNumber++) {
CoordinatorAction instance = temp.getActions().get(instanceNumber);
- InstanceUtil.createHDFSFolders(helper,
- Arrays.asList(instance.getMissingDependencies().split("#")));
+ HadoopUtil.createHDFSFolders(helper,
+ Arrays.asList(instance.getMissingDependencies().split("#")));
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
index da38085..d0a6dde 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java
@@ -47,6 +47,7 @@ import java.util.List;
* On adding further late data it checks whether the data has been replicated correctly in the given late cut-off time.
* Assuming that late frequency set in server is 3 minutes. Although value can be changed according to requirement.
*/
+@Test(groups = "embedded")
public class FeedLateRerunTest extends BaseTestClass {
private ColoHelper cluster1 = servers.get(0);
@@ -67,10 +68,8 @@ public class FeedLateRerunTest extends BaseTestClass {
public void setUp(Method method) throws JAXBException, IOException {
LOGGER.info("test name: " + method.getName());
Bundle bundle = BundleUtil.readFeedReplicationBundle();
-
bundles[0] = new Bundle(bundle, cluster1);
bundles[1] = new Bundle(bundle, cluster2);
-
bundles[0].generateUniqueBundle();
bundles[1].generateUniqueBundle();
}
@@ -109,29 +108,22 @@ public class FeedLateRerunTest extends BaseTestClass {
.withClusterType(ClusterType.TARGET)
.withDataLocation(targetDataLocation)
.build()).toString();
-
String entityName = Util.readEntityName(feed);
//submit and schedule feed
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
//check if coordinator exists
-
InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
-
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName,
- "REPLICATION"), 1);
-
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, "REPLICATION"), 1);
//Finding bundleId of replicated instance on target
String bundleId = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
-
//Finding and creating missing dependencies
List<String> missingDependencies = getAndCreateDependencies(
cluster1, cluster1FS, cluster2, cluster2OC, bundleId, false, entityName);
-
int count = 1;
for (String location : missingDependencies) {
if (count==1) {
@@ -139,40 +131,28 @@ public class FeedLateRerunTest extends BaseTestClass {
count++;
}
}
-
source=splitPathFromIp(source, "8020");
LOGGER.info("source : " + source);
target = source.replace("source", "target");
LOGGER.info("target : " + target);
-
-
- /*
- Sleep for some time ( as is defined in runtime property of server ).
- Let the instance rerun and then it should succeed.
- */
-
+ /* Sleep for some time ( as is defined in runtime property of server ).
+ Let the instance rerun and then it should succeed.*/
int sleepMins = 8;
for(int i=0; i < sleepMins; i++) {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
-
-
String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
OozieUtil.validateRetryAttempts(cluster2, bundleID, EntityType.FEED, 1);
//check if data has been replicated correctly
List<Path> cluster1ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source));
+ .getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source)));
List<Path> cluster2ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target));
-
+ .getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target)));
AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
-
}
-
@Test(enabled = true)
public void feedLateRerunTestWithData()
throws AuthenticationException, IOException, URISyntaxException, JAXBException,
@@ -202,20 +182,15 @@ public class FeedLateRerunTest extends BaseTestClass {
.withClusterType(ClusterType.TARGET)
.withDataLocation(targetDataLocation)
.build()).toString();
-
String entityName = Util.readEntityName(feed);
//submit and schedule feed
AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed));
//check if coordinator exists
-
InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0);
-
Assert.assertEquals(InstanceUtil
- .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName,
- "REPLICATION"), 1);
-
+ .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, "REPLICATION"), 1);
//Finding bundleId of replicated instance on target
String bundleId = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
@@ -223,7 +198,6 @@ public class FeedLateRerunTest extends BaseTestClass {
//Finding and creating missing dependencies
List<String> missingDependencies = getAndCreateDependencies(
cluster1, cluster1FS, cluster2, cluster2OC, bundleId, true, entityName);
-
int count = 1;
for (String location : missingDependencies) {
if (count==1) {
@@ -231,37 +205,27 @@ public class FeedLateRerunTest extends BaseTestClass {
count++;
}
}
-
LOGGER.info("source : " + source);
source=splitPathFromIp(source, "8020");
LOGGER.info("source : " + source);
target = source.replace("source", "target");
LOGGER.info("target : " + target);
-
-
- /*
- Sleep for some time ( as is defined in runtime property of server ).
- Let the instance rerun and then it should succeed.
- */
-
+ /* Sleep for some time ( as is defined in runtime property of server ).
+ Let the instance rerun and then it should succeed.*/
int sleepMins = 8;
for(int i=0; i < sleepMins; i++) {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
-
-
String bundleID = InstanceUtil.getLatestBundleID(cluster2, entityName, EntityType.FEED);
OozieUtil.validateRetryAttempts(cluster2, bundleID, EntityType.FEED, 1);
//check if data has been replicated correctly
List<Path> cluster1ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster1FS, new Path(source));
+ .getAllFilesRecursivelyHDFS(cluster1FS, new Path(HadoopUtil.cutProtocol(source)));
List<Path> cluster2ReplicatedData = HadoopUtil
- .getAllFilesRecursivelyHDFS(cluster2FS, new Path(target));
-
+ .getAllFilesRecursivelyHDFS(cluster2FS, new Path(HadoopUtil.cutProtocol(target)));
AssertUtil.checkForListSizes(cluster1ReplicatedData, cluster2ReplicatedData);
-
}
private String splitPathFromIp(String src, String port) {
@@ -274,7 +238,6 @@ public class FeedLateRerunTest extends BaseTestClass {
}
}
}
-
if (tempSrc.isEmpty()) {
reqSrc = src;
} else {
@@ -283,16 +246,12 @@ public class FeedLateRerunTest extends BaseTestClass {
return reqSrc;
}
- /*
- prismHelper1 - source colo
- prismHelper2 - target colo
- */
+ /* prismHelper1 - source colo, prismHelper2 - target colo */
private List<String> getAndCreateDependencies(ColoHelper prismHelper1, FileSystem clusterFS1,
ColoHelper prismHelper2,
OozieClient oozieClient2, String bundleId,
boolean dataFlag, String entityName)
throws OozieClientException, IOException {
-
List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
for (int i = 0; i < 10 && missingDependencies == null; ++i) {
TimeUtil.sleepSeconds(30);
@@ -300,15 +259,12 @@ public class FeedLateRerunTest extends BaseTestClass {
missingDependencies = OozieUtil.getMissingDependencies(prismHelper2, bundleId);
}
Assert.assertNotNull(missingDependencies, "Missing dependencies not found.");
-
//print missing dependencies
for (String dependency : missingDependencies) {
LOGGER.info("dependency from job: " + dependency);
}
-
// Creating missing dependencies
- InstanceUtil.createHDFSFolders(prismHelper1, missingDependencies);
-
+ HadoopUtil.createHDFSFolders(prismHelper1, missingDependencies);
//Adding data to empty folders depending on dataFlag
if (dataFlag) {
int tempCount = 1;
@@ -320,13 +276,10 @@ public class FeedLateRerunTest extends BaseTestClass {
}
}
}
-
//replication should start, wait while it ends
InstanceUtil.waitTillInstanceReachState(oozieClient2, entityName, 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.FEED);
-
// Adding data for late rerun
-
int tempCounter = 1;
for (String dependency : missingDependencies) {
if (tempCounter==1) {
@@ -335,10 +288,6 @@ public class FeedLateRerunTest extends BaseTestClass {
}
tempCounter++;
}
-
return missingDependencies;
-
}
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
index e1a4dd4..88c52d9 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessInstanceKillsTest.java
@@ -162,7 +162,7 @@ public class ProcessInstanceKillsTest extends BaseTestClass {
if (actions.size() == 6) {
for(int i = 0; i < 5; i++) {
CoordinatorAction action = actions.get(i);
- InstanceUtil.createHDFSFolders(cluster, Arrays
+ HadoopUtil.createHDFSFolders(cluster, Arrays
.asList(action.getMissingDependencies().split("#")));
}
break;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/c1ac6e6a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
index 488cf74..6804b8d 100644
--- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
+++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ProcessLateRerunTest.java
@@ -42,10 +42,8 @@ import java.util.*;
/**
* Process late data test.
*/
-
+@Test(groups = "embedded")
public class ProcessLateRerunTest extends BaseTestClass {
-
-
private ColoHelper cluster1 = servers.get(0);
private OozieClient cluster1OC = serverOC.get(0);
private FileSystem cluster1FS = serverFS.get(0);
@@ -79,7 +77,6 @@ public class ProcessLateRerunTest extends BaseTestClass {
* It checks the number of rerun attempts once late data has been added
* ensuring that late rerun happened.
*/
-
@Test(enabled = true)
public void testProcessLateRerunOnEmptyFolder() throws Exception {
String startTime = TimeUtil.getTimeWrtSystemTime(0);
@@ -106,7 +103,6 @@ public class ProcessLateRerunTest extends BaseTestClass {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
-
InstanceUtil.waitTillInstanceReachState(cluster1OC,
Util.getProcessName(bundles[0].getProcessData()), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -116,7 +112,6 @@ public class ProcessLateRerunTest extends BaseTestClass {
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
-
}
/**
@@ -150,7 +145,6 @@ public class ProcessLateRerunTest extends BaseTestClass {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
-
InstanceUtil.waitTillInstanceReachState(cluster1OC,
Util.getProcessName(bundles[0].getProcessData()), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -160,7 +154,6 @@ public class ProcessLateRerunTest extends BaseTestClass {
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
-
}
/**
@@ -198,7 +191,6 @@ public class ProcessLateRerunTest extends BaseTestClass {
LOGGER.info("Waiting...");
TimeUtil.sleepSeconds(60);
}
-
InstanceUtil.waitTillInstanceReachState(cluster1OC,
Util.getProcessName(bundles[0].getProcessData()), 1,
CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS);
@@ -208,7 +200,6 @@ public class ProcessLateRerunTest extends BaseTestClass {
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 1);
-
}
/**
@@ -267,14 +258,12 @@ public class ProcessLateRerunTest extends BaseTestClass {
String bundleID = bundleList.get(0);
OozieUtil.validateRetryAttempts(cluster1, bundleID, EntityType.PROCESS, 0);
-
}
/*
dataFlag - denotes whether process should run initially on empty folders or folders containing data
dataFolder - denotes the folder where you want to upload data for late rerun
*/
-
private void getAndCreateDependencies(ColoHelper prismHelper, Bundle bundle,
OozieClient oozieClient, FileSystem clusterFS,
boolean dataFlag, int dataFolder) {
@@ -355,5 +344,4 @@ public class ProcessLateRerunTest extends BaseTestClass {
lateProcess.getLateInputs().add(lateInput);
return lateProcess;
}
-
}