You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sa...@apache.org on 2014/08/04 12:04:21 UTC
[22/27] adding falcon-regression
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java
new file mode 100644
index 0000000..a35cd2d
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/response/lineage/VerticesResult.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.response.lineage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class VerticesResult extends GraphResult {
+ List<Vertex> results;
+
+ public List<Vertex> getResults() {
+ return results;
+ }
+
+ @Override
+ public String toString() {
+ return "AllVertices{" +
+ "totalSize=" + totalSize +
+ ", results=" + results +
+ '}';
+ }
+
+ public List<Vertex> filterByType(Vertex.VERTEX_TYPE vertex_type) {
+ return filterVerticesByType(vertex_type, results);
+ }
+
+ public List<Vertex> filterVerticesByType(Vertex.VERTEX_TYPE vertex_type,
+ List<Vertex> vertexList) {
+ List<Vertex> result = new ArrayList<Vertex>();
+ for (Vertex vertex : vertexList) {
+ if(vertex.getType() == vertex_type) {
+ result.add(vertex);
+ }
+ }
+ return result;
+ }
+
+ public List<Vertex> filterByName(String name) {
+ return filterVerticesByName(name, results);
+ }
+
+ public List<Vertex> filterVerticesByName(String name, List<Vertex> vertexList) {
+ List<Vertex> result = new ArrayList<Vertex>();
+ for (Vertex vertex : vertexList) {
+ if(vertex.getName().equals(name)) {
+ result.add(vertex);
+ }
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java
new file mode 100755
index 0000000..77a5cc6
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/Brother.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.supportClasses;
+
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.falcon.regression.core.util.Util.URLS;
+import org.testng.TestNGException;
+import org.apache.log4j.Logger;
+
+public class Brother extends Thread {
+ String operation;
+ String data;
+ URLS url;
+ ServiceResponse output;
+ private static final Logger logger = Logger.getLogger(Brother.class);
+
+ public ServiceResponse getOutput() {
+ return output;
+ }
+
+ IEntityManagerHelper entityManagerHelper;
+
+ public Brother(String threadName, String operation, EntityType entityType, ThreadGroup tGroup,
+ Bundle b, ColoHelper p, URLS url) {
+ super(tGroup, threadName);
+ this.operation = operation;
+ switch (entityType) {
+ case PROCESS:
+ this.data = b.getProcessData();
+ this.entityManagerHelper = p.getProcessHelper();
+ break;
+ case CLUSTER:
+ this.entityManagerHelper = p.getClusterHelper();
+ this.data = b.getClusters().get(0);
+ break;
+ case FEED:
+ this.entityManagerHelper = p.getFeedHelper();
+ this.data = b.getDataSets().get(0);
+ break;
+ default:
+ logger.error("Unexpected entityType=" + entityType);
+ }
+ this.url = url;
+ this.output = new ServiceResponse();
+ }
+
+ public void run() {
+ try {
+ sleep(50L);
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new TestNGException(e.getMessage());
+ }
+ logger.info("Brother " + this.getName() + " will be executing " + operation);
+ try {
+ switch (url) {
+ case SUBMIT_URL:
+ output = entityManagerHelper.submitEntity(url, data);
+ break;
+ case GET_ENTITY_DEFINITION:
+ output = entityManagerHelper.getEntityDefinition(url, data);
+ break;
+ case DELETE_URL:
+ output = entityManagerHelper.delete(url, data);
+ break;
+ case SUSPEND_URL:
+ output = entityManagerHelper.suspend(url, data);
+ break;
+ case SCHEDULE_URL:
+ output = entityManagerHelper.schedule(url, data);
+ break;
+ case RESUME_URL:
+ output = entityManagerHelper.resume(url, data);
+ break;
+ case SUBMIT_AND_SCHEDULE_URL:
+ output = entityManagerHelper.submitAndSchedule(url, data);
+ break;
+ case STATUS_URL:
+ output = entityManagerHelper.getStatus(url, data);
+ break;
+ default:
+ logger.error("Unexpected url: " + url);
+ break;
+ }
+ logger.info("Brother " + getName() + "'s response to the " + operation + " is: " +
+ output);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java
new file mode 100644
index 0000000..0ebcb53
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/HadoopFileEditor.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.supportClasses;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.falcon.regression.core.util.Util;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedWriter;
+import java.io.BufferedReader;
+import java.io.FileReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class HadoopFileEditor {
+ private static final Logger logger = Logger.getLogger(HadoopFileEditor.class);
+ FileSystem fs;
+ List<String> paths;
+ List<String> files;
+
+ public HadoopFileEditor(FileSystem fs) {
+ this.fs = fs;
+ paths = new ArrayList<String>();
+ files = new ArrayList<String>();
+ }
+
+ /*
+ method to edit a file present on HDFS. Path is the location on HDFS,
+ 2nd param is the first instance of string after u want ur tesxt to be
+ inserted, 3rd param is the text u want to insert
+ */
+ public void edit(String path, String putAfterString, String toBeInserted) throws IOException {
+ paths.add(path);
+ String currentFile = Util.getFileNameFromPath(path);
+ files.add(currentFile);
+ FileUtils.deleteQuietly(new File(currentFile));
+ FileUtils.deleteQuietly(new File("." + currentFile + ".crc"));
+ FileUtils.deleteQuietly(new File(currentFile + ".bck"));
+ FileUtils.deleteQuietly(new File("tmp"));
+
+ Path file = new Path(path);
+ //check if currentFile exists or not
+ if (fs.exists(file)) {
+ fs.copyToLocalFile(file, new Path(currentFile));
+ FileUtils.copyFile(new File(currentFile), new File(currentFile + ".bck"));
+ BufferedWriter bufwriter = new BufferedWriter(new FileWriter
+ ("tmp"));
+ BufferedReader br = new BufferedReader(new FileReader(currentFile));
+ String line;
+ boolean isInserted = false;
+ while ((line = br.readLine()) != null) {
+ bufwriter.write(line);
+ bufwriter.write('\n');
+ if (line.contains(putAfterString) && !isInserted) {
+ bufwriter.write(toBeInserted);
+ isInserted = true;
+ }
+ }
+ br.close();
+ bufwriter.close();
+ FileUtils.deleteQuietly(new File(currentFile));
+ FileUtils.copyFile(new File("tmp"), new File(currentFile));
+ FileUtils.deleteQuietly(new File("tmp"));
+
+ fs.delete(file, false);
+ File crcFile = new File("." + currentFile + ".crc");
+ if (crcFile.exists())
+ crcFile.delete();
+ fs.copyFromLocalFile(new Path(currentFile), file);
+ } else {
+ logger.info("Nothing to do, " + currentFile + " does not exists");
+ }
+ }
+
+ /*
+ puts back the original file to HDFS that was editied by edit function
+ */
+ public void restore() throws IOException {
+ for (int i = 0; i < paths.size(); i++) {
+ fs.delete(new Path(paths.get(i)), false);
+ FileUtils.deleteQuietly(new File(files.get(i)));
+ FileUtils.copyFile(new File(files.get(i) + ".bck"),
+ new File(files.get(i)));
+ fs.copyFromLocalFile(new Path(files.get(i)), new Path(paths.get(i)));
+ FileUtils.deleteQuietly(new File(files.get(i)));
+ FileUtils.deleteQuietly(new File(files.get(i) + ".bck"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java
new file mode 100644
index 0000000..d7e952b
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/supportClasses/JmsMessageConsumer.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.supportClasses;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.ArrayList;
+import java.util.List;
+
+public class JmsMessageConsumer extends Thread {
+ /*URL of the JMS server
+ brokerURL = "tcp://host:61616?daemon=true";
+ ActiveMQConnection.DEFAULT_BROKER_URL;
+ Name of the queue we will receive messages from
+ String subject = "IVORY.TOPIC";*/
+
+ private static final Logger logger = Logger.getLogger(JmsMessageConsumer.class);
+ private static final int MAX_MESSAGE_COUNT = 1000;
+
+ final String brokerURL;
+ final String topicName;
+ final List<MapMessage> receivedMessages;
+
+ public List<MapMessage> getReceivedMessages() {
+ return receivedMessages;
+ }
+
+ public JmsMessageConsumer(String topicName, String brokerURL) {
+ super(topicName);
+ this.topicName = topicName;
+ this.brokerURL = brokerURL;
+ receivedMessages = new ArrayList<MapMessage>();
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Getting JMS connection from the server
+ Connection connection = new ActiveMQConnectionFactory(brokerURL).createConnection();
+ connection.start();
+
+ // Creating session for sending messages
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createTopic(topicName);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ try {
+ logger.info("Starting to receive messages.");
+ int count = 0;
+ for (; count < MAX_MESSAGE_COUNT; ++ count) {
+ Message message = consumer.receive(); //blocking call
+ if (message == null) {
+ logger.info("Received empty message, count = " + count);
+ } else {
+ logger.info("Received message, id = " + message.getJMSMessageID());
+ receivedMessages.add((MapMessage) message);
+ }
+ }
+ if (count >= MAX_MESSAGE_COUNT) {
+ logger.warn("Not reading more messages, already read " + count + " messages.");
+ }
+ } finally {
+ logger.info("Stopping to receive messages.");
+ connection.close();
+ }
+ } catch (Exception e) {
+ logger.info("caught exception: " + ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
new file mode 100644
index 0000000..db73eda
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/AssertUtil.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.response.APIResult;
+import org.apache.falcon.regression.core.response.InstancesResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.oozie.client.Job;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
+import org.testng.Assert;
+
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Util methods for assert.
+ */
+public final class AssertUtil {
+
+ private AssertUtil() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+
+ private static final Logger LOGGER = Logger.getLogger(AssertUtil.class);
+
+ /**
+ * Checks that any path in list doesn't contains a string.
+ *
+ * @param paths list of paths
+ * @param shouldNotBePresent string that shouldn't be present
+ */
+ public static void failIfStringFoundInPath(
+ List<Path> paths, String... shouldNotBePresent) {
+ for (Path path : paths) {
+ for (String aShouldNotBePresent : shouldNotBePresent) {
+ if (path.toUri().toString().contains(aShouldNotBePresent)) {
+ Assert.fail("String " + aShouldNotBePresent + " was not expected in path "
+ +
+ path.toUri().toString());
+ }
+ }
+ }
+ }
+
+ /**
+ * Checks that two lists have same size.
+ *
+ * @param expected expected list
+ * @param actual actual list
+ */
+ public static void checkForListSizes(List<?> expected, List<?> actual) {
+ if (expected.size() != actual.size()) {
+ LOGGER.info("expected = " + expected);
+ }
+ checkForListSize(actual, expected.size());
+ }
+
+ /**
+ * Checks that two lists have same size.
+ *
+ * @param elements list of elements
+ * @param expectedSize expected size of the list
+ */
+ public static void checkForListSize(List<?> elements, int expectedSize) {
+ if (elements.size() != expectedSize) {
+ LOGGER.info("expectedSize = " + expectedSize);
+ LOGGER.info("elements.size() = " + elements.size());
+ LOGGER.info("elements = " + elements);
+ }
+ Assert.assertEquals(elements.size(), expectedSize,
+ "Size of expected and actual list don't match.");
+ }
+
+ /**
+ * Checks that two lists has expected diff element.
+ *
+ * @param initialState first list
+ * @param finalState second list
+ * @param filename expected diff element
+ * @param expectedDiff diff count (positive for new elements)
+ */
+ public static void compareDataStoreStates(List<String> initialState,
+ List<String> finalState, String filename,
+ int expectedDiff) {
+
+ if (expectedDiff > -1) {
+ finalState.removeAll(initialState);
+ Assert.assertEquals(finalState.size(), expectedDiff);
+ if (expectedDiff != 0) {
+ Assert.assertTrue(finalState.get(0).contains(filename));
+ }
+ } else {
+ expectedDiff = expectedDiff * -1;
+ initialState.removeAll(finalState);
+ Assert.assertEquals(initialState.size(), expectedDiff);
+ if (expectedDiff != 0) {
+ Assert.assertTrue(initialState.get(0).contains(filename));
+ }
+ }
+
+
+ }
+
+ /**
+ * Checks that two lists has expected diff element.
+ *
+ * @param initialState first list
+ * @param finalState second list
+ * @param expectedDiff diff count (positive for new elements)
+ */
+ public static void compareDataStoreStates(List<String> initialState,
+ List<String> finalState, int expectedDiff) {
+
+ if (expectedDiff > -1) {
+ finalState.removeAll(initialState);
+ Assert.assertEquals(finalState.size(), expectedDiff);
+
+ } else {
+ expectedDiff = expectedDiff * -1;
+ initialState.removeAll(finalState);
+ Assert.assertEquals(initialState.size(), expectedDiff);
+
+ }
+
+
+ }
+
+ /**
+ * Checks that ServiceResponse status is SUCCEEDED.
+ *
+ * @param response ServiceResponse
+ * @throws JAXBException
+ */
+ public static void assertSucceeded(ServiceResponse response) throws JAXBException {
+ Assert.assertEquals(Util.parseResponse(response).getStatus(),
+ APIResult.Status.SUCCEEDED, "Status should be SUCCEEDED");
+ Assert.assertEquals(Util.parseResponse(response).getStatusCode(), 200,
+ "Status code should be 200");
+ Assert.assertNotNull(Util.parseResponse(response).getMessage(), "Status message is null");
+ }
+
+ /**
+ * Checks that ProcessInstancesResult status is SUCCEEDED.
+ *
+ * @param response ProcessInstancesResult
+ */
+ public static void assertSucceeded(InstancesResult response) {
+ Assert.assertNotNull(response.getMessage());
+ Assert.assertEquals(response.getStatus(), APIResult.Status.SUCCEEDED,
+ "Status should be SUCCEEDED");
+ }
+
+ /**
+ * Checks that ServiceResponse status is status FAILED.
+ *
+ * @param response ServiceResponse
+ * @param message message for exception
+ * @throws JAXBException
+ */
+ public static void assertFailed(final ServiceResponse response, final String message)
+ throws JAXBException {
+ assertFailedWithStatus(response, 400, message);
+ }
+
+ /**
+ * Checks that ServiceResponse status is status FAILED with some status code.
+ *
+ * @param response ServiceResponse
+ * @param statusCode expected status code
+ * @param message message for exception
+ * @throws JAXBException
+ */
+ public static void assertFailedWithStatus(final ServiceResponse response, final int statusCode,
+ final String message) throws JAXBException {
+ Assert.assertNotEquals(response.message, "null", "response message should not be null");
+ Assert.assertEquals(Util.parseResponse(response).getStatus(),
+ APIResult.Status.FAILED, message);
+ Assert.assertEquals(Util.parseResponse(response).getStatusCode(), statusCode,
+ message);
+ Assert.assertNotNull(Util.parseResponse(response).getRequestId(), "RequestId is null");
+ }
+
+ /**
+ * Checks that ServiceResponse status is status PARTIAL.
+ *
+ * @param response ServiceResponse
+ * @throws JAXBException
+ */
+ public static void assertPartial(ServiceResponse response) throws JAXBException {
+ Assert.assertEquals(Util.parseResponse(response).getStatus(), APIResult.Status.PARTIAL);
+ Assert.assertEquals(Util.parseResponse(response).getStatusCode(), 400);
+ Assert.assertNotNull(Util.parseResponse(response).getMessage());
+ }
+
+ /**
+ * Checks that ServiceResponse status is status FAILED with status code 400.
+ *
+ * @param response ServiceResponse
+ * @throws JAXBException
+ */
+ public static void assertFailed(ServiceResponse response) throws JAXBException {
+ Assert.assertNotEquals(response.message, "null", "response message should not be null");
+
+ Assert.assertEquals(Util.parseResponse(response).getStatus(), APIResult.Status.FAILED);
+ Assert.assertEquals(Util.parseResponse(response).getStatusCode(), 400);
+ }
+
+ /**
+ * Checks that status of some entity job is equal to expected. Method can wait
+ * 100 seconds for expected status.
+ *
+ * @param oozieClient OozieClient
+ * @param entityType FEED or PROCESS
+ * @param data feed or proceess XML
+ * @param expectedStatus expected Job.Status of entity
+ * @throws OozieClientException
+ */
+ public static void checkStatus(OozieClient oozieClient, EntityType entityType, String data,
+ Job.Status expectedStatus)
+ throws OozieClientException {
+ String name = null;
+ if (entityType == EntityType.FEED) {
+ name = Util.readEntityName(data);
+ } else if (entityType == EntityType.PROCESS) {
+ name = Util.readEntityName(data);
+ }
+ Assert.assertEquals(
+ OozieUtil.verifyOozieJobStatus(oozieClient, name, entityType, expectedStatus), true,
+ "Status should be " + expectedStatus);
+ }
+
+ /**
+ * Checks that status of some entity job is equal to expected. Method can wait
+ * 100 seconds for expected status.
+ *
+ * @param oozieClient OozieClient
+ * @param entityType FEED or PROCESS
+ * @param bundle Bundle with feed or process data
+ * @param expectedStatus expected Job.Status of entity
+ * @throws OozieClientException
+ */
+ public static void checkStatus(OozieClient oozieClient, EntityType entityType, Bundle bundle,
+ Job.Status expectedStatus)
+ throws OozieClientException {
+ String data = null;
+ if (entityType == EntityType.FEED) {
+ data = bundle.getDataSets().get(0);
+ } else if (entityType == EntityType.PROCESS) {
+ data = bundle.getProcessData();
+ }
+ checkStatus(oozieClient, entityType, data, expectedStatus);
+ }
+
+ /**
+ * Checks that status of some entity job is NOT equal to expected.
+ *
+ * @param oozieClient OozieClient
+ * @param entityType FEED or PROCESS
+ * @param data feed or proceess XML
+ * @param expectedStatus expected Job.Status of entity
+ * @throws OozieClientException
+ */
+ public static void checkNotStatus(OozieClient oozieClient, EntityType entityType, String data,
+ Job.Status expectedStatus)
+ throws OozieClientException {
+ String processName = null;
+ if (entityType == EntityType.FEED) {
+ processName = Util.readEntityName(data);
+ } else if (entityType == EntityType.PROCESS) {
+ processName = Util.readEntityName(data);
+ }
+ Assert.assertNotEquals(OozieUtil.getOozieJobStatus(oozieClient, processName,
+ entityType), expectedStatus, "Status should not be " + expectedStatus);
+ }
+
+ /**
+ * Checks that status of some entity job is NOT equal to expected.
+ *
+ * @param oozieClient OozieClient
+ * @param entityType FEED or PROCESS
+ * @param bundle Bundle with feed or process data
+ * @param expectedStatus expected Job.Status of entity
+ * @throws OozieClientException
+ */
+ public static void checkNotStatus(OozieClient oozieClient, EntityType entityType,
+ Bundle bundle, Job.Status expectedStatus)
+ throws OozieClientException {
+ String data = null;
+ if (entityType == EntityType.FEED) {
+ data = bundle.getDataSets().get(0);
+ } else if (entityType == EntityType.PROCESS) {
+ data = bundle.getProcessData();
+ }
+ checkNotStatus(oozieClient, entityType, data, expectedStatus);
+ }
+
+ /**
+ * Checks size of the content a two locations.
+ *
+ * @param firstPath path to the first location
+ * @param secondPath path to the second location
+ * @param fs hadoop file system for the locations
+ * @throws IOException
+ */
+ public static void checkContentSize(String firstPath, String secondPath, FileSystem fs) throws
+ IOException {
+ final ContentSummary firstSummary = fs.getContentSummary(new Path(firstPath));
+ final ContentSummary secondSummary = fs.getContentSummary(new Path(secondPath));
+ LOGGER.info(firstPath + " : firstSummary = " + firstSummary.toString(false));
+ LOGGER.info(secondPath + " : secondSummary = " + secondSummary.toString(false));
+ Assert.assertEquals(firstSummary.getLength(), secondSummary.getLength(),
+ "Contents at the two locations don't have same size.");
+ }
+
+ /**
+ * Fail the test because of the supplied exception.
+ * @param e exception
+ */
+ public static void fail(Exception e) {
+ LOGGER.info("Got exception: " + ExceptionUtils.getStackTrace(e));
+ Assert.fail("Failing because of exception.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
new file mode 100644
index 0000000..1f73523
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.falcon.regression.core.bundle.Bundle;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * util methods related to bundle.
+ */
+public final class BundleUtil {
+ private BundleUtil() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+ private static final Logger LOGGER = Logger.getLogger(BundleUtil.class);
+
+ public static Bundle readLateDataBundle() throws IOException {
+ return readBundleFromFolder("LateDataBundles");
+ }
+
+ public static Bundle readRetryBundle() throws IOException {
+ return readBundleFromFolder("RetryTests");
+ }
+
+ public static Bundle readRetentionBundle() throws IOException {
+ return readBundleFromFolder("RetentionBundles");
+ }
+
+ public static Bundle readELBundle() throws IOException {
+ return readBundleFromFolder("ELbundle");
+ }
+
+ public static Bundle readHCatBundle() throws IOException {
+ return readBundleFromFolder("hcat");
+ }
+
+ public static Bundle readHCat2Bundle() throws IOException {
+ return readBundleFromFolder("hcat_2");
+ }
+
+ public static Bundle readLocalDCBundle() throws IOException {
+ return readBundleFromFolder("LocalDC_feedReplicaltion_BillingRC");
+ }
+
+ public static Bundle readImpressionRCBundle() throws IOException {
+ return readBundleFromFolder("impressionRC");
+ }
+
+ public static Bundle readUpdateBundle() throws IOException {
+ return readBundleFromFolder("updateBundle");
+ }
+
+ private static Bundle readBundleFromFolder(final String folderPath) throws IOException {
+ LOGGER.info("Loading xmls from directory: " + folderPath);
+ File directory = null;
+ try {
+ directory = new File(BundleUtil.class.getResource("/" + folderPath).toURI());
+ } catch (URISyntaxException e) {
+ Assert.fail("could not find dir: " + folderPath);
+ }
+ final Collection<File> list = FileUtils.listFiles(directory, new String[] {"xml"}, true);
+ File[] files = list.toArray(new File[list.size()]);
+ Arrays.sort(files);
+ String clusterData = "";
+ final List<String> dataSets = new ArrayList<String>();
+ String processData = "";
+
+ for (File file : files) {
+ LOGGER.info("Loading data from path: " + file.getAbsolutePath());
+ final String data = IOUtils.toString(file.toURI());
+
+ if (data.contains("uri:ivory:cluster:0.1") || data.contains("uri:falcon:cluster:0.1")) {
+ LOGGER.info("data been added to cluster");
+ clusterData = data;
+ } else if (data.contains("uri:ivory:feed:0.1")
+ ||
+ data.contains("uri:falcon:feed:0.1")) {
+ LOGGER.info("data been added to feed");
+ dataSets.add(InstanceUtil.setFeedACL(data));
+ } else if (data.contains("uri:ivory:process:0.1")
+ ||
+ data.contains("uri:falcon:process:0.1")) {
+ LOGGER.info("data been added to process");
+ processData = data;
+ }
+ }
+ Assert.assertNotNull(clusterData, "expecting cluster data to be non-empty");
+ Assert.assertTrue(!dataSets.isEmpty(), "expecting feed data to be non-empty");
+ return new Bundle(clusterData, dataSets, processData);
+ }
+
+ public static void submitAllClusters(ColoHelper prismHelper, Bundle... b)
+ throws IOException, URISyntaxException, AuthenticationException {
+ for (Bundle aB : b) {
+ ServiceResponse r = prismHelper.getClusterHelper()
+ .submitEntity(Util.URLS.SUBMIT_URL, aB.getClusters().get(0));
+ Assert.assertTrue(r.getMessage().contains("SUCCEEDED"));
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java
new file mode 100644
index 0000000..5415083
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/CleanupUtil.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.falcon.regression.core.helpers.ColoHelper;
+import org.apache.falcon.regression.core.interfaces.IEntityManagerHelper;
+import org.apache.falcon.regression.core.response.EntitiesResult;
+import org.apache.falcon.regression.core.response.EntityResult;
+import org.apache.falcon.regression.core.response.ServiceResponse;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.log4j.Logger;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * util methods related to conf.
+ */
+public final class CleanupUtil {
+ private CleanupUtil() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+ private static final Logger LOGGER = Logger.getLogger(CleanupUtil.class);
+
+ public static List<String> getAllProcesses(ColoHelper prism)
+ throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+ return getAllEntitiesOfOneType(prism.getProcessHelper());
+ }
+
+ public static List<String> getAllFeeds(ColoHelper prism)
+ throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+ return getAllEntitiesOfOneType(prism.getFeedHelper());
+ }
+
+ public static List<String> getAllClusters(ColoHelper prism)
+ throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+ return getAllEntitiesOfOneType(prism.getClusterHelper());
+ }
+
+ private static List<String> getAllEntitiesOfOneType(IEntityManagerHelper iEntityManagerHelper)
+ throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+ final EntitiesResult entitiesResult = getEntitiesResultOfOneType(iEntityManagerHelper);
+ List<String> clusters = new ArrayList<String>();
+ for (EntityResult entity : entitiesResult.getEntities()) {
+ clusters.add(entity.getName());
+ }
+ return clusters;
+ }
+
+ private static EntitiesResult getEntitiesResultOfOneType(
+ IEntityManagerHelper iEntityManagerHelper)
+ throws IOException, URISyntaxException, AuthenticationException, JAXBException {
+ final ServiceResponse clusterResponse =
+ iEntityManagerHelper.listEntities(Util.URLS.LIST_URL);
+ JAXBContext jc = JAXBContext.newInstance(EntitiesResult.class);
+ Unmarshaller u = jc.createUnmarshaller();
+ return (EntitiesResult) u.unmarshal(
+ new StringReader(clusterResponse.getMessage()));
+ }
+
+ public static void cleanAllClustersQuietly(ColoHelper prism) {
+ try {
+ final List<String> clusters = getAllClusters(prism);
+ for (String cluster : clusters) {
+ try {
+ prism.getClusterHelper().deleteByName(Util.URLS.DELETE_URL, cluster, null);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception: " + ExceptionUtils.getStackTrace(e));
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Unable to get a list of clusters because of exception: "
+ +
+ ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public static void cleanAllFeedsQuietly(ColoHelper prism) {
+ try {
+ final List<String> feeds = getAllFeeds(prism);
+ for (String feed : feeds) {
+ try {
+ prism.getFeedHelper().deleteByName(Util.URLS.DELETE_URL, feed, null);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception: " + ExceptionUtils.getStackTrace(e));
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Unable to get a list of feeds because of exception: "
+ +
+ ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public static void cleanAllProcessesQuietly(ColoHelper prism,
+ IEntityManagerHelper entityManagerHelper) {
+ try {
+ final List<String> processes = getAllProcesses(prism);
+ for (String process : processes) {
+ try {
+ entityManagerHelper.deleteByName(Util.URLS.DELETE_URL, process, null);
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception: " + ExceptionUtils.getStackTrace(e));
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Unable to get a list of feeds because of exception: "
+ +
+ ExceptionUtils.getStackTrace(e));
+ }
+ }
+
+ public static void cleanAllEntities(ColoHelper prism) {
+ cleanAllProcessesQuietly(prism, prism.getProcessHelper());
+ cleanAllFeedsQuietly(prism);
+ cleanAllClustersQuietly(prism);
+ }
+
+ public static void deleteQuietly(IEntityManagerHelper helper, String feed) {
+ try {
+ helper.delete(Util.URLS.DELETE_URL, feed);
+ } catch (Exception e) {
+ LOGGER.info("Caught exception: " + ExceptionUtils.getStackTrace(e));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java
new file mode 100644
index 0000000..a6e83ef
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Config.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+public class Config {
+ private static final Logger logger = Logger.getLogger(Config.class);
+
+ private static final String MERLIN_PROPERTIES = "Merlin.properties";
+ private static final Config INSTANCE = new Config(MERLIN_PROPERTIES);
+
+ private PropertiesConfiguration confObj;
+ private Config(String propFileName) {
+ try {
+ logger.info("Going to read properties from: " + propFileName);
+ confObj = new PropertiesConfiguration(Config.class.getResource("/" + propFileName));
+ } catch (ConfigurationException e) {
+ Assert.fail("Could not read properties because of exception: " + e);
+ }
+ }
+
+ public static String getProperty(String key) {
+ return INSTANCE.confObj.getString(key);
+ }
+
+ public static String[] getStringArray(String key) {
+ return INSTANCE.confObj.getStringArray(key);
+ }
+
+ public static String getProperty(String key, String defaultValue) {
+ return INSTANCE.confObj.getString(key, defaultValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java
new file mode 100644
index 0000000..d240e76
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/ExecUtil.java
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import com.jcraft.jsch.UserInfo;
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * util methods related to exec.
+ */
+public final class ExecUtil {
+ private ExecUtil() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+ private static final Logger LOGGER = Logger.getLogger(ExecUtil.class);
+
+ static List<String> runRemoteScriptAsSudo(final String hostName, final String userName,
+ final String password, final String command,
+ final String runAs, final String identityFile) throws
+ JSchException, IOException {
+ JSch jsch = new JSch();
+ Session session = jsch.getSession(userName, hostName, 22);
+ // only set the password if its not empty
+ if (null != password && !password.isEmpty()) {
+ session.setUserInfo(new HardcodedUserInfo(password));
+ }
+ Properties config = new Properties();
+ config.setProperty("StrictHostKeyChecking", "no");
+ config.setProperty("UserKnownHostsFile", "/dev/null");
+ // only set the password if its not empty
+ if (null == password || password.isEmpty()) {
+ jsch.addIdentity(identityFile);
+ }
+ session.setConfig(config);
+ session.connect();
+ Assert.assertTrue(session.isConnected(), "The session was not connected correctly!");
+
+ List<String> data = new ArrayList<String>();
+
+ ChannelExec channel = (ChannelExec) session.openChannel("exec");
+ channel.setPty(true);
+ String runCmd;
+ if (null == runAs || runAs.isEmpty()) {
+ runCmd = "sudo -S -p '' " + command;
+ } else {
+ runCmd = String.format("sudo su - %s -c '%s'", runAs, command);
+ }
+ if (userName.equals(runAs)) {
+ runCmd = command;
+ }
+ LOGGER.info(
+ "host_name: " + hostName + " user_name: " + userName + " password: " + password
+ +
+ " command: " +runCmd);
+ channel.setCommand(runCmd);
+ InputStream in = channel.getInputStream();
+ OutputStream out = channel.getOutputStream();
+ channel.setErrStream(System.err);
+ channel.connect();
+ TimeUtil.sleepSeconds(20);
+ // only print the password if its not empty
+ if (null != password && !password.isEmpty()) {
+ out.write((password + "\n").getBytes());
+ out.flush();
+ }
+
+ //save console output to data
+ BufferedReader r = new BufferedReader(new InputStreamReader(in));
+ String line;
+ while (true) {
+ while ((line=r.readLine())!=null) {
+ LOGGER.debug(line);
+ data.add(line);
+ }
+ if (channel.isClosed()) {
+ break;
+ }
+ }
+
+ byte[] tmp = new byte[1024];
+ while (true) {
+ while (in.available() > 0) {
+ int i = in.read(tmp, 0, 1024);
+ if (i < 0) {
+ break;
+ }
+ LOGGER.info(new String(tmp, 0, i));
+ }
+ if (channel.isClosed()) {
+ LOGGER.info("exit-status: " + channel.getExitStatus());
+ break;
+ }
+ TimeUtil.sleepSeconds(1);
+ }
+
+ in.close();
+ channel.disconnect();
+ session.disconnect();
+ out.close();
+ return data;
+ }
+
+ public static ExecResult executeCommand(String command) {
+ LOGGER.info("Command to be executed: " + command);
+ StringBuilder errors = new StringBuilder();
+ StringBuilder output = new StringBuilder();
+
+ try {
+ Process process = Runtime.getRuntime().exec(command);
+
+ BufferedReader errorReader =
+ new BufferedReader(new InputStreamReader(process.getErrorStream()));
+ BufferedReader consoleReader =
+ new BufferedReader(new InputStreamReader(process.getInputStream()));
+
+ String line;
+ while ((line = errorReader.readLine()) != null) {
+ errors.append(line).append("\n");
+ }
+
+ while ((line = consoleReader.readLine()) != null) {
+ output.append(line).append("\n");
+ }
+ final int exitVal = process.waitFor();
+ LOGGER.info("exitVal: " + exitVal);
+ LOGGER.info("output: " + output);
+ LOGGER.info("errors: " + errors);
+ return new ExecResult(exitVal, output.toString().trim(), errors.toString().trim());
+ } catch (InterruptedException e) {
+ Assert.fail("Process execution failed:" + ExceptionUtils.getStackTrace(e));
+ } catch (IOException e) {
+ Assert.fail("Process execution failed:" + ExceptionUtils.getStackTrace(e));
+ }
+ return null;
+ }
+
+ public static int executeCommandGetExitCode(String command) {
+ return executeCommand(command).getExitVal();
+ }
+
+ public static String executeCommandGetOutput(String command) {
+ return executeCommand(command).getOutput();
+ }
+
+ private static final class HardcodedUserInfo implements UserInfo {
+
+ private final String password;
+
+ private HardcodedUserInfo(String password) {
+ this.password = password;
+ }
+
+ public String getPassphrase() {
+ return null;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public boolean promptPassword(String s) {
+ return true;
+ }
+
+ public boolean promptPassphrase(String s) {
+ return true;
+ }
+
+ public boolean promptYesNo(String s) {
+ return true;
+ }
+
+ public void showMessage(String s) {
+ LOGGER.info("message = " + s);
+ }
+ }
+
+ private static final class ExecResult {
+
+ private final int exitVal;
+ private final String output;
+ private final String error;
+
+ private ExecResult(final int exitVal, final String output, final String error) {
+ this.exitVal = exitVal;
+ this.output = output;
+ this.error = error;
+ }
+
+ public int getExitVal() {
+ return exitVal;
+ }
+
+ public String getOutput() {
+ return output;
+ }
+
+ public String getError() {
+ return error;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java
new file mode 100644
index 0000000..86f9d6f
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/Generator.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+public class Generator {
+ final String prefix;
+ final String postfix;
+ final String formatString;
+ private int count;
+
+
+ private Generator(String prefix, String postfix, String formatString) {
+ this.prefix = prefix;
+ this.postfix = postfix;
+ this.count = 0;
+ this.formatString = formatString;
+ }
+
+ public String generate() {
+ count++;
+ return String.format(formatString, prefix, count, postfix);
+ }
+
+ public static Generator getNameGenerator(String prefix, String postfix) {
+ return new Generator(prefix, postfix, "%s%03d-%s");
+ }
+
+ public static Generator getHadoopPathGenerator(String prefix, String postfix) {
+ return new Generator(prefix, postfix, "%s_%03d%s");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
new file mode 100644
index 0000000..ed0ea48
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/GraphAssert.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.falcon.regression.core.response.lineage.Edge;
+import org.apache.falcon.regression.core.response.lineage.EdgesResult;
+import org.apache.falcon.regression.core.response.lineage.GraphResult;
+import org.apache.falcon.regression.core.response.lineage.NODE_TYPE;
+import org.apache.falcon.regression.core.response.lineage.Vertex;
+import org.apache.falcon.regression.core.response.lineage.VerticesResult;
+import org.apache.log4j.Logger;
+import org.testng.Assert;
+
+/**
+ * util methods for Graph Asserts.
+ */
+public final class GraphAssert {
+ private GraphAssert() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+ private static final Logger LOGGER = Logger.getLogger(GraphAssert.class);
+
+ /**
+ * Check that the result has certain minimum number of vertices.
+ * @param graphResult the result to be checked
+ * @param minNumOfVertices required number of vertices
+ */
+ public static void checkVerticesPresence(final GraphResult graphResult,
+ final int minNumOfVertices) {
+ Assert.assertTrue(graphResult.getTotalSize() >= minNumOfVertices,
+ "graphResult should have at least " + minNumOfVertices + " vertex");
+ }
+
+ /**
+ * Check that the vertices in the result are sane.
+ * @param verticesResult the result to be checked
+ */
+ public static void assertVertexSanity(final VerticesResult verticesResult) {
+ Assert.assertEquals(verticesResult.getResults().size(), verticesResult.getTotalSize(),
+ "Size of vertices don't match");
+ for (Vertex vertex : verticesResult.getResults()) {
+ Assert.assertNotNull(vertex.get_id(),
+ "id of the vertex should be non-null: " + vertex);
+ Assert.assertEquals(vertex.get_type(), NODE_TYPE.VERTEX,
+ "_type of the vertex should be non-null: " + vertex);
+ Assert.assertNotNull(vertex.getName(),
+ "name of the vertex should be non-null: " + vertex);
+ Assert.assertNotNull(vertex.getType(),
+ "id of the vertex should be non-null: " + vertex);
+ Assert.assertNotNull(vertex.getTimestamp(),
+ "id of the vertex should be non-null: " + vertex);
+ }
+ }
+
+ /**
+ * Check that edges in the result are sane.
+ * @param edgesResult result to be checked
+ */
+ public static void assertEdgeSanity(final EdgesResult edgesResult) {
+ Assert.assertEquals(edgesResult.getResults().size(), edgesResult.getTotalSize(),
+ "Size of edges don't match");
+ for (Edge edge : edgesResult.getResults()) {
+ assertEdgeSanity(edge);
+ }
+ }
+
+ /**
+ * Check that edge is sane.
+ * @param edge edge to be checked
+ */
+ public static void assertEdgeSanity(Edge edge) {
+ Assert.assertNotNull(edge.get_id(), "id of an edge can't be null: " + edge);
+ Assert.assertEquals(edge.get_type(), NODE_TYPE.EDGE,
+ "_type of an edge can't be null: " + edge);
+ Assert.assertNotNull(edge.get_label(), "_label of an edge can't be null: " + edge);
+ Assert.assertTrue(edge.get_inV() > 0, "_inV of an edge can't be null: " + edge);
+ Assert.assertTrue(edge.get_outV() > 0, "_outV of an edge can't be null: " + edge);
+ }
+
+ /**
+ * Check that user vertex is present.
+ * @param verticesResult the result to be checked
+ */
+ public static void assertUserVertexPresence(final VerticesResult verticesResult) {
+ checkVerticesPresence(verticesResult, 1);
+ for(Vertex vertex : verticesResult.getResults()) {
+ if (vertex.getType() == Vertex.VERTEX_TYPE.USER
+ && vertex.getName().equals(MerlinConstants.CURRENT_USER_NAME)) {
+ return;
+ }
+ }
+ Assert.fail(String.format("Vertex corresponding to user: %s is not present.",
+ MerlinConstants.CURRENT_USER_NAME));
+ }
+
+ /**
+ * Check that a vertex of a certain name is present.
+ * @param verticesResult the result to be checked
+ * @param name expected name
+ */
+ public static void assertVertexPresence(final VerticesResult verticesResult, final String name) {
+ checkVerticesPresence(verticesResult, 1);
+ for (Vertex vertex : verticesResult.getResults()) {
+ if (vertex.getName().equals(name)) {
+ return;
+ }
+ }
+ Assert.fail(String.format("Vertex of name: %s is not present.", name));
+ }
+
+ /**
+ * Check that the result has at least a certain number of vertices of a certain type.
+ * @param verticesResult the result to be checked
+ * @param vertexType vertex type
+ * @param minOccurrence required number of vertices
+ */
+ public static void assertVerticesPresenceMinOccur(final VerticesResult verticesResult,
+ final Vertex.VERTEX_TYPE vertexType,
+ final int minOccurrence) {
+ int occurrence = 0;
+ for(Vertex vertex : verticesResult.getResults()) {
+ if (vertex.getType() == vertexType) {
+ LOGGER.info("Found vertex: " + vertex);
+ occurrence++;
+ if (occurrence >= minOccurrence) {
+ return;
+ }
+ }
+ }
+ Assert.fail(String.format("Expected at least %d vertices of type %s. But found only %d",
+ minOccurrence, vertexType, occurrence));
+ }
+
+ /**
+ * Check result to contain at least a certain number of edges of a certain type.
+ * @param edgesResult result to be checked
+ * @param edgeLabel edge label
+ * @param minOccurrence required number of edges
+ */
+ public static void assertEdgePresenceMinOccur(final EdgesResult edgesResult,
+ final Edge.LEBEL_TYPE edgeLabel,
+ final int minOccurrence) {
+ int occurrence = 0;
+ for(Edge edge : edgesResult.getResults()) {
+ if (edge.get_label() == edgeLabel) {
+ LOGGER.info("Found edge: " + edge);
+ occurrence++;
+ if (occurrence >= minOccurrence) {
+ return;
+ }
+ }
+ }
+ Assert.fail(String.format("Expected at least %d vertices of type %s. But found only %d",
+ minOccurrence, edgeLabel, occurrence));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
new file mode 100644
index 0000000..d878ecb
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HCatUtil.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+
+import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hive.hcatalog.api.HCatClient;
+import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+import org.apache.hive.hcatalog.common.HCatException;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
+
+/**
+ * util methods for HCat.
+ */
+public final class HCatUtil {
+ private HCatUtil() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+
+ public static HCatClient getHCatClient(String hCatEndPoint, String hiveMetaStorePrinciple)
+ throws HCatException {
+ HiveConf hcatConf = new HiveConf();
+ hcatConf.set("hive.metastore.local", "false");
+ hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, hCatEndPoint);
+ hcatConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, hiveMetaStorePrinciple);
+ hcatConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, MerlinConstants.IS_SECURE);
+ hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
+ hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ HCatSemanticAnalyzer.class.getName());
+ hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+ hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+ hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+ return HCatClient.create(hcatConf);
+ }
+
+ @SuppressWarnings("deprecation")
+ public static HCatFieldSchema getStringSchema(String fieldName, String comment) throws HCatException {
+ return new HCatFieldSchema(fieldName, HCatFieldSchema.Type.STRING, comment);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/bdcf001f/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
new file mode 100644
index 0000000..c33700c
--- /dev/null
+++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/HadoopUtil.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.regression.core.util;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Util methods related to hadoop.
+ */
+public final class HadoopUtil {
+
+ public static final String SOMETHING_RANDOM = "somethingRandom";
+ private static final Logger LOGGER = Logger.getLogger(HadoopUtil.class);
+
+ private HadoopUtil() {
+ throw new AssertionError("Instantiating utility class...");
+ }
+ public static List<String> getAllFilesHDFS(FileSystem fs, Path location) throws IOException {
+
+ List<String> files = new ArrayList<String>();
+ if (!fs.exists(location)) {
+ return files;
+ }
+ FileStatus[] stats = fs.listStatus(location);
+
+ for (FileStatus stat : stats) {
+ if (!isDir(stat)) {
+ files.add(stat.getPath().toString());
+ }
+ }
+ return files;
+ }
+
+ public static List<Path> getAllDirsRecursivelyHDFS(
+ FileSystem fs, Path location, int depth) throws IOException {
+
+ List<Path> returnList = new ArrayList<Path>();
+
+ FileStatus[] stats = fs.listStatus(location);
+
+ for (FileStatus stat : stats) {
+ if (isDir(stat)) {
+ returnList.add(stat.getPath());
+ if (depth > 0) {
+ returnList.addAll(getAllDirsRecursivelyHDFS(fs, stat.getPath(), depth - 1));
+ }
+ }
+
+ }
+
+ return returnList;
+ }
+
+ public static List<Path> getAllFilesRecursivelyHDFS(
+ FileSystem fs, Path location) throws IOException {
+
+ List<Path> returnList = new ArrayList<Path>();
+
+ FileStatus[] stats;
+ try {
+ stats = fs.listStatus(location);
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ return new ArrayList<Path>();
+ }
+
+ if (stats == null) {
+ return returnList;
+ }
+ for (FileStatus stat : stats) {
+
+ if (!isDir(stat)) {
+ if (!stat.getPath().toUri().toString().contains("_SUCCESS")) {
+ returnList.add(stat.getPath());
+ }
+ } else {
+ returnList.addAll(getAllFilesRecursivelyHDFS(fs, stat.getPath()));
+ }
+ }
+
+ return returnList;
+
+ }
+
+ @SuppressWarnings("deprecation")
+ private static boolean isDir(FileStatus stat) {
+ return stat.isDir();
+ }
+
+ public static void copyDataToFolder(final FileSystem fs, final String dstHdfsDir,
+ final String srcFileLocation)
+ throws IOException {
+ LOGGER.info(String.format("Copying local dir %s to hdfs location %s on %s",
+ srcFileLocation, dstHdfsDir, fs.getUri()));
+ fs.copyFromLocalFile(new Path(srcFileLocation), new Path(dstHdfsDir));
+ }
+
+ public static void uploadDir(final FileSystem fs, final String dstHdfsDir,
+ final String localLocation)
+ throws IOException {
+ LOGGER.info(String.format("Uploading local dir %s to hdfs location %s", localLocation,
+ dstHdfsDir));
+ HadoopUtil.deleteDirIfExists(dstHdfsDir, fs);
+ HadoopUtil.copyDataToFolder(fs, dstHdfsDir, localLocation);
+ }
+
+ public static List<String> getHDFSSubFoldersName(FileSystem fs,
+ String baseDir) throws IOException {
+
+ List<String> returnList = new ArrayList<String>();
+
+ FileStatus[] stats = fs.listStatus(new Path(baseDir));
+
+
+ for (FileStatus stat : stats) {
+ if (isDir(stat)) {
+ returnList.add(stat.getPath().getName());
+ }
+
+ }
+
+
+ return returnList;
+ }
+
+ public static boolean isFilePresentHDFS(FileSystem fs, String hdfsPath, String fileToCheckFor)
+ throws IOException {
+
+ LOGGER.info("getting file from folder: " + hdfsPath);
+
+ List<String> fileNames = getAllFileNamesFromHDFS(fs, hdfsPath);
+
+ for (String filePath : fileNames) {
+
+ if (filePath.contains(fileToCheckFor)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ private static List<String> getAllFileNamesFromHDFS(
+ FileSystem fs, String hdfsPath) throws IOException {
+
+ List<String> returnList = new ArrayList<String>();
+
+ LOGGER.info("getting file from folder: " + hdfsPath);
+ FileStatus[] stats = fs.listStatus(new Path(hdfsPath));
+
+ for (FileStatus stat : stats) {
+ String currentPath = stat.getPath().toUri().getPath(); // gives directory name
+ if (!isDir(stat)) {
+ returnList.add(currentPath);
+ }
+
+
+ }
+ return returnList;
+ }
+
+ public static void recreateDir(FileSystem fs, String path) throws IOException {
+
+ deleteDirIfExists(path, fs);
+ LOGGER.info("creating hdfs dir: " + path + " on " + fs.getConf().get("fs.default.name"));
+ fs.mkdirs(new Path(path));
+
+ }
+
+ public static void recreateDir(List<FileSystem> fileSystems, String path) throws IOException {
+
+ for (FileSystem fs : fileSystems) {
+ recreateDir(fs, path);
+ }
+ }
+
+ public static void deleteDirIfExists(String hdfsPath, FileSystem fs) throws IOException {
+ Path path = new Path(hdfsPath);
+ if (fs.exists(path)) {
+ LOGGER.info(String.format("Deleting HDFS path: %s on %s", path, fs.getUri()));
+ fs.delete(path, true);
+ } else {
+ LOGGER.info(String.format(
+ "Not deleting non-existing HDFS path: %s on %s", path, fs.getUri()));
+ }
+ }
+
+ public static void flattenAndPutDataInFolder(FileSystem fs, String inputPath,
+ List<String> remoteLocations) throws IOException {
+ flattenAndPutDataInFolder(fs, inputPath, "", remoteLocations);
+ }
+
+ public static List<String> flattenAndPutDataInFolder(FileSystem fs, String inputPath,
+ String remotePathPrefix,
+ List<String> remoteLocations) throws IOException {
+ if (StringUtils.isNotEmpty(remotePathPrefix)) {
+ deleteDirIfExists(remotePathPrefix, fs);
+ }
+ LOGGER.info("Creating data in folders: \n" + remoteLocations);
+ File input = new File(inputPath);
+ File[] files = input.isDirectory() ? input.listFiles() : new File[]{input};
+ List<Path> filePaths = new ArrayList<Path>();
+ assert files != null;
+ for (final File file : files) {
+ if (!file.isDirectory()) {
+ final Path filePath = new Path(file.getAbsolutePath());
+ filePaths.add(filePath);
+ }
+ }
+
+ if (!remotePathPrefix.endsWith("/") && !remoteLocations.get(0).startsWith("/")) {
+ remotePathPrefix += "/";
+ }
+ Pattern pattern = Pattern.compile(":[\\d]+/"); // remove 'hdfs(hftp)://server:port'
+ List<String> locations = new ArrayList<String>();
+ for (String remoteDir : remoteLocations) {
+ String remoteLocation = remotePathPrefix + remoteDir;
+ if (pattern.matcher(remoteLocation).find()) {
+ remoteLocation = remoteLocation.split(":[\\d]+")[1];
+ }
+ locations.add(remoteLocation);
+ LOGGER.info(String.format("copying to: %s files: %s",
+ fs.getUri() + remoteLocation, Arrays.toString(files)));
+ if (!fs.exists(new Path(remoteLocation))) {
+ fs.mkdirs(new Path(remoteLocation));
+ }
+
+ fs.copyFromLocalFile(false, true, filePaths.toArray(new Path[filePaths.size()]),
+ new Path(remoteLocation));
+ }
+ return locations;
+ }
+
+ public static void createLateDataFoldersWithRandom(FileSystem fs, String folderPrefix,
+ List<String> folderList) throws IOException {
+ LOGGER.info("creating late data folders.....");
+ folderList.add(SOMETHING_RANDOM);
+
+ for (final String folder : folderList) {
+ fs.mkdirs(new Path(folderPrefix + folder));
+ }
+
+ LOGGER.info("created all late data folders.....");
+ }
+
+ public static void copyDataToFolders(FileSystem fs, List<String> folderList,
+ String directory, String folderPrefix) throws IOException {
+ LOGGER.info("copying data into folders....");
+ List<String> fileLocations = new ArrayList<String>();
+ File[] files = new File(directory).listFiles();
+ if (files != null) {
+ for (final File file : files) {
+ fileLocations.add(file.toString());
+ }
+ }
+ copyDataToFolders(fs, folderPrefix, folderList,
+ fileLocations.toArray(new String[fileLocations.size()]));
+ }
+
+ public static void copyDataToFolders(FileSystem fs, final String folderPrefix,
+ List<String> folderList, String... fileLocations) throws IOException {
+ for (final String folder : folderList) {
+ boolean r;
+ String folderSpace = folder.replaceAll("/", "_");
+ File f = new File(OSUtil.NORMAL_INPUT + folderSpace + ".txt");
+ if (!f.exists()) {
+ r = f.createNewFile();
+ if (!r) {
+ LOGGER.info("file could not be created");
+ }
+ }
+
+ FileWriter fr = new FileWriter(f);
+ fr.append("folder");
+ fr.close();
+ fs.copyFromLocalFile(new Path(f.getAbsolutePath()), new Path(folderPrefix + folder));
+ r = f.delete();
+ if (!r) {
+ LOGGER.info("delete was not successful");
+ }
+
+ Path[] srcPaths = new Path[fileLocations.length];
+ for (int i = 0; i < srcPaths.length; ++i) {
+ srcPaths[i] = new Path(fileLocations[i]);
+ }
+ LOGGER.info(String.format("copying %s to %s%s on %s", Arrays.toString(srcPaths),
+ folderPrefix, folder, fs.getUri()));
+ fs.copyFromLocalFile(false, true, srcPaths, new Path(folderPrefix + folder));
+ }
+ }
+
+ public static void lateDataReplenish(FileSystem fs, int interval,
+ int minuteSkip, String folderPrefix) throws IOException {
+ List<String> folderData = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+
+ createLateDataFoldersWithRandom(fs, folderPrefix, folderData);
+ copyDataToFolders(fs, folderData, OSUtil.NORMAL_INPUT, folderPrefix);
+ }
+
+ public static void createLateDataFolders(FileSystem fs, final String folderPrefix,
+ List<String> folderList) throws IOException {
+ for (final String folder : folderList) {
+ fs.mkdirs(new Path(folderPrefix + folder));
+ }
+ }
+
+ public static void injectMoreData(FileSystem fs, final String remoteLocation,
+ String localLocation) throws IOException {
+ File[] files = new File(localLocation).listFiles();
+ assert files != null;
+ for (final File file : files) {
+ if (!file.isDirectory()) {
+ String path = remoteLocation + "/" + System.currentTimeMillis() / 1000 + "/";
+ LOGGER.info("inserting data@ " + path);
+ fs.copyFromLocalFile(new Path(file.getAbsolutePath()), new Path(path));
+ }
+ }
+
+ }
+
+ public static void putFileInFolderHDFS(FileSystem fs, int interval, int minuteSkip,
+ String folderPrefix, String fileToBePut)
+ throws IOException {
+ List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+ LOGGER.info("folderData: " + folderPaths.toString());
+
+ createLateDataFolders(fs, folderPrefix, folderPaths);
+
+ if (fileToBePut.equals("_SUCCESS")) {
+ copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.NORMAL_INPUT + "_SUCCESS");
+ } else {
+ copyDataToFolders(fs, folderPrefix, folderPaths, OSUtil.NORMAL_INPUT + "log_01.txt");
+ }
+
+ }
+
+ public static void lateDataReplenishWithoutSuccess(FileSystem fs, int interval,
+ int minuteSkip, String folderPrefix, String postFix) throws IOException {
+ List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+ LOGGER.info("folderData: " + folderPaths.toString());
+
+ if (postFix != null) {
+ for (int i = 0; i < folderPaths.size(); i++) {
+ folderPaths.set(i, folderPaths.get(i) + postFix);
+ }
+ }
+
+ createLateDataFolders(fs, folderPrefix, folderPaths);
+ copyDataToFolders(fs, folderPrefix, folderPaths,
+ OSUtil.NORMAL_INPUT + "log_01.txt");
+ }
+
+ public static void lateDataReplenish(FileSystem fs, int interval, int minuteSkip,
+ String folderPrefix, String postFix) throws IOException {
+ List<String> folderPaths = TimeUtil.getMinuteDatesOnEitherSide(interval, minuteSkip);
+ LOGGER.info("folderData: " + folderPaths.toString());
+
+ if (postFix != null) {
+ for (int i = 0; i < folderPaths.size(); i++) {
+ folderPaths.set(i, folderPaths.get(i) + postFix);
+ }
+ }
+
+ createLateDataFolders(fs, folderPrefix, folderPaths);
+ copyDataToFolders(fs, folderPrefix, folderPaths,
+ OSUtil.NORMAL_INPUT + "_SUCCESS", OSUtil.NORMAL_INPUT + "log_01.txt");
+ }
+
+ public static void replenishData(FileSystem fileSystem, String prefix, List<String> folderList,
+ boolean uploadData) throws IOException {
+ //purge data first
+ deleteDirIfExists(prefix, fileSystem);
+
+ folderList.add(SOMETHING_RANDOM);
+
+ for (final String folder : folderList) {
+ final String pathString = prefix + folder;
+ LOGGER.info(fileSystem.getUri() + pathString);
+ fileSystem.mkdirs(new Path(pathString));
+ if (uploadData) {
+ fileSystem.copyFromLocalFile(new Path(OSUtil.RESOURCES + "log_01.txt"),
+ new Path(pathString));
+ }
+ }
+ }
+}