You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/03/01 09:26:20 UTC
[34/51] [partial] falcon git commit: FALCON-1830 Removed code source
directories and updated pom
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
deleted file mode 100644
index 9b1e1f4..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-
-/**
- * Captures the workflow execution context.
- */
-public class WorkflowExecutionContext {
-
- private static final Logger LOG = LoggerFactory.getLogger(WorkflowExecutionContext.class);
-
- public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
-
- public static final String OUTPUT_FEED_SEPARATOR = ",";
- public static final String INPUT_FEED_SEPARATOR = "#";
- public static final String CLUSTER_NAME_SEPARATOR = ",";
-
- /**
- * Workflow execution status.
- */
- public enum Status {WAITING, RUNNING, SUSPENDED, SUCCEEDED, FAILED, TIMEDOUT, KILLED}
-
- /**
- * Workflow execution type.
- */
- public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACTION}
-
- /**
- * Entity operations supported.
- */
- public enum EntityOperations {
- GENERATE, DELETE, REPLICATE, IMPORT, EXPORT
- }
-
- public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = {
- WorkflowExecutionArgs.CLUSTER_NAME,
- WorkflowExecutionArgs.ENTITY_NAME,
- WorkflowExecutionArgs.ENTITY_TYPE,
- WorkflowExecutionArgs.NOMINAL_TIME,
- WorkflowExecutionArgs.OPERATION,
-
- WorkflowExecutionArgs.OUTPUT_FEED_NAMES,
- WorkflowExecutionArgs.OUTPUT_FEED_PATHS,
-
- WorkflowExecutionArgs.WORKFLOW_ID,
- WorkflowExecutionArgs.WORKFLOW_USER,
- WorkflowExecutionArgs.RUN_ID,
- WorkflowExecutionArgs.STATUS,
- WorkflowExecutionArgs.TIMESTAMP,
- WorkflowExecutionArgs.LOG_DIR,
- };
-
- private final Map<WorkflowExecutionArgs, String> context;
- private final long creationTime;
- private Configuration actionJobConf;
-
- public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) {
- this.context = context;
- creationTime = System.currentTimeMillis();
- }
-
- public String getValue(WorkflowExecutionArgs arg) {
- return context.get(arg);
- }
-
- public void setValue(WorkflowExecutionArgs arg, String value) {
- context.put(arg, value);
- }
-
- public String getValue(WorkflowExecutionArgs arg, String defaultValue) {
- return context.containsKey(arg) ? context.get(arg) : defaultValue;
- }
-
- public boolean containsKey(WorkflowExecutionArgs arg) {
- return context.containsKey(arg);
- }
-
- public Set<Map.Entry<WorkflowExecutionArgs, String>> entrySet() {
- return context.entrySet();
- }
-
- // helper methods
- public boolean hasWorkflowSucceeded() {
- return Status.SUCCEEDED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
- }
-
- public boolean hasWorkflowFailed() {
- return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
- }
-
- public boolean isWorkflowKilledManually(){
- try {
- return WorkflowEngineFactory.getWorkflowEngine().
- isWorkflowKilledByUser(
- getValue(WorkflowExecutionArgs.CLUSTER_NAME),
- getValue(WorkflowExecutionArgs.WORKFLOW_ID));
- } catch (Exception e) {
- LOG.error("Got Error in getting error codes from actions: " + e);
- }
- return false;
- }
-
- public boolean hasWorkflowTimedOut() {
- return Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS));
- }
-
- public boolean hasWorkflowBeenKilled() {
- return Status.KILLED.name().equals(getValue(WorkflowExecutionArgs.STATUS));
- }
-
- public String getContextFile() {
- return getValue(WorkflowExecutionArgs.CONTEXT_FILE);
- }
-
- public Status getWorkflowStatus() {
- return Status.valueOf(getValue(WorkflowExecutionArgs.STATUS));
- }
-
- public String getLogDir() {
- return getValue(WorkflowExecutionArgs.LOG_DIR);
- }
-
- public String getLogFile() {
- return getValue(WorkflowExecutionArgs.LOG_FILE);
- }
-
- String getNominalTime() {
- return getValue(WorkflowExecutionArgs.NOMINAL_TIME);
- }
-
- /**
- * Returns nominal time as a ISO8601 formatted string.
- * @return a ISO8601 formatted string
- */
- public String getNominalTimeAsISO8601() {
- return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), INSTANCE_FORMAT);
- }
-
- String getTimestamp() {
- return getValue(WorkflowExecutionArgs.TIMESTAMP);
- }
-
- /**
- * Returns timestamp as a long.
- * @return Date as long (milliseconds since epoch) for the timestamp.
- */
- public long getTimeStampAsLong() {
- String dateString = getTimestamp();
- try {
- DateFormat dateFormat = new SimpleDateFormat(INSTANCE_FORMAT.substring(0, dateString.length()));
- dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- return dateFormat.parse(dateString).getTime();
- } catch (java.text.ParseException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Returns timestamp as a ISO8601 formatted string.
- * @return a ISO8601 formatted string
- */
- public String getTimeStampAsISO8601() {
- return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), INSTANCE_FORMAT);
- }
-
- public String getClusterName() {
- String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
- if (EntityOperations.REPLICATE != getOperation()) {
- return value;
- }
-
- return value.split(CLUSTER_NAME_SEPARATOR)[0];
- }
-
- public String getSrcClusterName() {
- String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
- if (EntityOperations.REPLICATE != getOperation()) {
- return value;
- }
-
- String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
- if (parts.length != 2) {
- throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
- }
-
- return parts[1];
- }
-
- public String getEntityName() {
- return getValue(WorkflowExecutionArgs.ENTITY_NAME);
- }
-
- public String getEntityType() {
- return getValue(WorkflowExecutionArgs.ENTITY_TYPE).toUpperCase();
- }
-
- public EntityOperations getOperation() {
- if (getValue(WorkflowExecutionArgs.OPERATION) != null) {
- return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION));
- }
- return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.DATA_OPERATION));
- }
-
- public String getOutputFeedNames() {
- return getValue(WorkflowExecutionArgs.OUTPUT_FEED_NAMES);
- }
-
- public String[] getOutputFeedNamesList() {
- return getOutputFeedNames().split(OUTPUT_FEED_SEPARATOR);
- }
-
- public String getOutputFeedInstancePaths() {
- return getValue(WorkflowExecutionArgs.OUTPUT_FEED_PATHS);
- }
-
- public String[] getOutputFeedInstancePathsList() {
- return getOutputFeedInstancePaths().split(OUTPUT_FEED_SEPARATOR);
- }
-
- public String getInputFeedNames() {
- return getValue(WorkflowExecutionArgs.INPUT_FEED_NAMES);
- }
-
- public String[] getInputFeedNamesList() {
- return getInputFeedNames().split(INPUT_FEED_SEPARATOR);
- }
-
- public String getInputFeedInstancePaths() {
- return getValue(WorkflowExecutionArgs.INPUT_FEED_PATHS);
- }
-
- public String[] getInputFeedInstancePathsList() {
- return getInputFeedInstancePaths().split(INPUT_FEED_SEPARATOR);
- }
-
- public String getWorkflowEngineUrl() {
- return getValue(WorkflowExecutionArgs.WF_ENGINE_URL);
- }
-
- public String getUserWorkflowEngine() {
- return getValue(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE);
- }
-
- public String getUserWorkflowVersion() {
- return getValue(WorkflowExecutionArgs.USER_WORKFLOW_VERSION);
- }
-
- public String getWorkflowId() {
- return getValue(WorkflowExecutionArgs.WORKFLOW_ID);
- }
-
- public String getWorkflowParentId() {
- return getValue(WorkflowExecutionArgs.PARENT_ID);
- }
-
- public String getUserSubflowId() {
- return getValue(WorkflowExecutionArgs.USER_SUBFLOW_ID);
- }
-
- public int getWorkflowRunId() {
- return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID));
- }
-
- public String getWorkflowRunIdString() {
- return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID)));
- }
-
- public String getWorkflowUser() {
- return getValue(WorkflowExecutionArgs.WORKFLOW_USER);
- }
-
- public long getExecutionCompletionTime() {
-
- return creationTime;
- }
-
- public String getDatasourceName() { return getValue(WorkflowExecutionArgs.DATASOURCE_NAME); }
-
- public long getWorkflowStartTime() {
- return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME));
- }
-
- public long getWorkflowEndTime() {
- return Long.parseLong(getValue(WorkflowExecutionArgs.WF_END_TIME));
- }
-
-
- public Type getContextType() {
- return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE));
- }
-
- public String getCounters() {
- return getValue(WorkflowExecutionArgs.COUNTERS);
- }
-
- /**
- * this method is invoked from with in the workflow.
- *
- * @throws java.io.IOException
- * @throws org.apache.falcon.FalconException
- */
- public void serialize() throws IOException, FalconException {
- serialize(getContextFile());
- }
-
- /**
- * this method is invoked from with in the workflow.
- *
- * @param contextFile file to serialize the workflow execution metadata
- * @throws org.apache.falcon.FalconException
- */
- public void serialize(String contextFile) throws FalconException {
- LOG.info("Saving context to: [{}]", contextFile);
- OutputStream out = null;
- Path file = new Path(contextFile);
- try {
- FileSystem fs =
- actionJobConf == null ? HadoopClientFactory.get().createProxiedFileSystem(file.toUri())
- : HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), actionJobConf);
- out = fs.create(file);
- out.write(JSONValue.toJSONString(context).getBytes());
- } catch (IOException e) {
- throw new FalconException("Error serializing context to: " + contextFile, e);
- } finally {
- if (out != null) {
- try {
- out.close();
- } catch (IOException ignore) {
- // ignore
- }
- }
- }
- }
-
- @Override
- public String toString() {
- return "WorkflowExecutionContext{" + context.toString() + "}";
- }
-
- @SuppressWarnings("unchecked")
- public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
- try {
- Path lineageDataPath = new Path(contextFile); // file has 777 permissions
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
- lineageDataPath.toUri());
-
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath)));
- return new WorkflowExecutionContext((Map<WorkflowExecutionArgs, String>) JSONValue.parse(in));
- } catch (IOException e) {
- throw new FalconException("Error opening context file: " + contextFile, e);
- }
- }
-
- public static String getFilePath(String logDir, String entityName, String entityType,
- EntityOperations operation) {
- // needed by feed clean up
- String parentSuffix = EntityType.PROCESS.name().equals(entityType)
- || EntityOperations.REPLICATE == operation ? "" : "/context/";
-
- // LOG_DIR is sufficiently unique
- return new Path(logDir + parentSuffix, entityName + "-wf-post-exec-context.json").toString();
- }
-
-
- public static Path getCounterFile(String logDir) {
- return new Path(logDir, "counter.txt");
- }
-
- public static String readCounters(FileSystem fs, Path counterFile) throws IOException{
- StringBuilder counterBuffer = new StringBuilder();
- BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(counterFile)));
- try {
- String line;
- while ((line = in.readLine()) != null) {
- counterBuffer.append(line);
- counterBuffer.append(",");
- }
- } catch (IOException e) {
- throw e;
- } finally {
- IOUtils.closeQuietly(in);
- }
-
- String counterString = counterBuffer.toString();
- if (StringUtils.isNotBlank(counterString) && counterString.length() > 0) {
- return counterString.substring(0, counterString.length() - 1);
- } else {
- return null;
- }
- }
-
- public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException {
- return create(args, type, null);
- }
-
- public static WorkflowExecutionContext create(String[] args, Type type, Configuration conf) throws FalconException {
- Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>();
-
- try {
- CommandLine cmd = getCommand(args);
- for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
- String optionValue = arg.getOptionValue(cmd);
- if (StringUtils.isNotEmpty(optionValue)) {
- wfProperties.put(arg, optionValue);
- }
- }
- } catch (ParseException e) {
- throw new FalconException("Error parsing wf args", e);
- }
-
- WorkflowExecutionContext executionContext = new WorkflowExecutionContext(wfProperties);
- executionContext.actionJobConf = conf;
- executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
- executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE,
- getFilePath(executionContext.getLogDir(), executionContext.getEntityName(),
- executionContext.getEntityType(), executionContext.getOperation()));
- addCounterToWF(executionContext);
-
- return executionContext;
- }
-
- private static void addCounterToWF(WorkflowExecutionContext executionContext) throws FalconException {
- if (executionContext.hasWorkflowFailed()) {
- LOG.info("Workflow Instance failed, counter will not be added: {}",
- executionContext.getWorkflowRunIdString());
- return;
- }
-
- FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem(
- new Path(executionContext.getLogDir()).toUri());
- Path counterFile = getCounterFile(executionContext.getLogDir());
- try {
- if (fs.exists(counterFile)) {
- String counters = readCounters(fs, counterFile);
- if (StringUtils.isNotBlank(counters)) {
- executionContext.context.put(WorkflowExecutionArgs.COUNTERS, counters);
- }
- }
- } catch (IOException e) {
- LOG.error("Error in accessing counter file :" + e);
- } finally {
- try {
- if (fs.exists(counterFile)) {
- fs.delete(counterFile, false);
- }
- } catch (IOException e) {
- LOG.error("Unable to delete counter file: {}", e);
- }
- }
- }
-
- private static CommandLine getCommand(String[] arguments) throws ParseException {
- Options options = new Options();
-
- for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
- addOption(options, arg, arg.isRequired());
- }
-
- return new GnuParser().parse(options, arguments, false);
- }
-
- private static void addOption(Options options, WorkflowExecutionArgs arg, boolean isRequired) {
- Option option = arg.getOption();
- option.setRequired(isRequired);
- options.addOption(option);
- }
-
- public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties) {
- return WorkflowExecutionContext.create(wfProperties, Type.POST_PROCESSING);
- }
-
- public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties, Type type) {
- wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name());
- return new WorkflowExecutionContext(wfProperties);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
deleted file mode 100644
index 7bf14f2..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.falcon.FalconException;
-
-/**
- * A listener interface for workflow execution.
- */
-public interface WorkflowExecutionListener {
-
- /**
- * Invoked when a workflow is succeeds.
- * @param context
- * @throws FalconException
- */
- void onSuccess(WorkflowExecutionContext context) throws FalconException;
-
- /**
- * Invoked when a workflow fails.
- * @param context
- * @throws FalconException
- */
- void onFailure(WorkflowExecutionContext context) throws FalconException;
-
- /**
- * Invoked on start of a workflow. Basically, when the workflow is RUNNING.
- * @param context
- * @throws FalconException
- */
- void onStart(WorkflowExecutionContext context) throws FalconException;
-
- /**
- * Invoked when a workflow is suspended.
- * @param context
- * @throws FalconException
- */
- void onSuspend(WorkflowExecutionContext context) throws FalconException;
-
- /**
- * Invoked when a workflow is in waiting state.
- * @param context
- * @throws FalconException
- */
- void onWait(WorkflowExecutionContext context) throws FalconException;
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
deleted file mode 100644
index b692258..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ /dev/null
@@ -1,312 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.aspect.GenericAlert;
-import org.apache.falcon.entity.EntityNotRegisteredException;
-import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.SchemaHelper;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.service.FalconService;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.falcon.util.StartupProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Date;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * A workflow job end notification service.
- */
-public class WorkflowJobEndNotificationService implements FalconService {
-
- private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
-
- public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
-
- private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
-
- // Maintain a cache of context built, so we don't have to query Oozie for every state change.
- private Map<String, Properties> contextMap = new ConcurrentHashMap<>();
-
- @Override
- public String getName() {
- return SERVICE_NAME;
- }
-
- // Mainly for test
- Map<String, Properties> getContextMap() {
- return contextMap;
- }
-
- @Override
- public void init() throws FalconException {
- String listenerClassNames = StartupProperties.get().getProperty(
- "workflow.execution.listeners");
- if (StringUtils.isEmpty(listenerClassNames)) {
- return;
- }
-
- for (String listenerClassName : listenerClassNames.split(",")) {
- listenerClassName = listenerClassName.trim();
- if (listenerClassName.isEmpty()) {
- continue;
- }
- WorkflowExecutionListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
- registerListener(listener);
- }
- }
-
- @Override
- public void destroy() throws FalconException {
- listeners.clear();
- }
-
- public void registerListener(WorkflowExecutionListener listener) {
- listeners.add(listener);
- }
-
- public void unregisterListener(WorkflowExecutionListener listener) {
- listeners.remove(listener);
- }
-
- public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
- notifyWorkflowEnd(context);
- }
-
- public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
- notifyWorkflowEnd(context);
- }
-
- public void notifyStart(WorkflowExecutionContext context) throws FalconException {
- // Start notifications can only be from Oozie JMS notifications
- if (!updateContextFromWFConf(context)) {
- return;
- }
- LOG.debug("Sending workflow start notification to listeners with context : {} ", context);
- for (WorkflowExecutionListener listener : listeners) {
- try {
- listener.onStart(context);
- } catch (Throwable t) {
- // do not rethrow as other listeners do not get a chance
- LOG.error("Error in listener {}", listener.getClass().getName(), t);
- }
- }
- }
-
- public void notifySuspend(WorkflowExecutionContext context) throws FalconException {
- // Suspend notifications can only be from Oozie JMS notifications
- if (!updateContextFromWFConf(context)) {
- return;
- }
- LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context);
- for (WorkflowExecutionListener listener : listeners) {
- try {
- listener.onSuspend(context);
- } catch (Throwable t) {
- // do not rethrow as other listeners do not get a chance
- LOG.error("Error in listener {}", listener.getClass().getName(), t);
- }
- }
-
- instrumentAlert(context);
- contextMap.remove(context.getWorkflowId());
- }
-
- public void notifyWait(WorkflowExecutionContext context) throws FalconException {
- // Wait notifications can only be from Oozie JMS notifications
- LOG.debug("Sending workflow wait notification to listeners with context : {} ", context);
- for (WorkflowExecutionListener listener : listeners) {
- try {
- listener.onWait(context);
- } catch (Throwable t) {
- // do not rethrow as other listeners do not get a chance
- LOG.error("Error in listener {}", listener.getClass().getName(), t);
- }
- }
- }
-
- // The method retrieves the conf from the cache if it is in cache.
- // Else, queries WF Engine to retrieve the conf of the workflow
- private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException {
- Properties wfProps = contextMap.get(context.getWorkflowId());
- if (wfProps == null) {
- Entity entity = null;
- try {
- entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName());
- } catch (EntityNotRegisteredException e) {
- // Entity no longer exists. No need to notify.
- LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.",
- context.getEntityName(), context.getEntityType());
- contextMap.remove(context.getWorkflowId());
- return false;
- }
- for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) {
- try {
- InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity)
- .getJobDetails(cluster, context.getWorkflowId()).getInstances();
- if (instances != null && instances.length > 0) {
- wfProps = getWFProps(instances[0].getWfParams());
- // Required by RetryService. But, is not part of conf.
- wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(),
- Integer.toString(instances[0].getRunId()));
- }
- } catch (FalconException e) {
- // Do Nothing. Move on to the next cluster.
- continue;
- }
- contextMap.put(context.getWorkflowId(), wfProps);
- }
- }
-
- // No extra props to enhance the context with.
- if (wfProps == null || wfProps.isEmpty()) {
- return true;
- }
-
- for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) {
- if (wfProps.containsKey(arg.getName())) {
- context.setValue(arg, wfProps.getProperty(arg.getName()));
- }
- }
- return true;
- }
-
- private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) {
- Properties props = new Properties();
- for (InstancesResult.KeyValuePair kv : wfParams) {
- props.put(kv.getKey(), kv.getValue());
- }
- return props;
- }
-
- // This method handles both success and failure notifications.
- private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException {
- // Need to distinguish notification from post processing for backward compatibility
- if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) {
- boolean engineNotifEnabled = false;
- try {
- engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine()
- .isNotificationEnabled(context.getClusterName(), context.getWorkflowId());
- } catch (FalconException e) {
- LOG.debug("Received error while checking if notification is enabled. "
- + "Hence, assuming notification is not enabled.");
- }
- // Ignore the message from post processing as there will be one more from Oozie.
- if (engineNotifEnabled) {
- LOG.info("Ignoring message from post processing as engine notification is enabled.");
- return;
- } else {
- updateContextWithTime(context);
- }
- } else {
- if (!updateContextFromWFConf(context)) {
- return;
- }
- }
-
- LOG.debug("Sending workflow end notification to listeners with context : {} ", context);
-
- for (WorkflowExecutionListener listener : listeners) {
- try {
- if (context.hasWorkflowSucceeded()) {
- listener.onSuccess(context);
- instrumentAlert(context);
- } else {
- listener.onFailure(context);
- if (context.hasWorkflowBeenKilled() || context.hasWorkflowFailed()) {
- instrumentAlert(context);
- }
- }
- } catch (Throwable t) {
- // do not rethrow as other listeners do not get a chance
- LOG.error("Error in listener {}", listener.getClass().getName(), t);
- }
- }
-
- contextMap.remove(context.getWorkflowId());
- }
-
- // In case of notifications coming from post notifications, start and end time need to be populated.
- private void updateContextWithTime(WorkflowExecutionContext context) {
- try {
- InstancesResult result = WorkflowEngineFactory.getWorkflowEngine()
- .getJobDetails(context.getClusterName(), context.getWorkflowId());
- Date startTime = result.getInstances()[0].startTime;
- Date endTime = result.getInstances()[0].endTime;
- Date now = new Date();
- if (startTime == null) {
- startTime = now;
- }
- if (endTime == null) {
- endTime = now;
- }
- context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime()));
- context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime()));
- } catch(FalconException e) {
- LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster "
- + context.getClusterName(), e);
- }
- }
-
- private void instrumentAlert(WorkflowExecutionContext context) {
- String clusterName = context.getClusterName();
- String entityName = context.getEntityName();
- String entityType = context.getEntityType();
- String operation = context.getOperation().name();
- String workflowId = context.getWorkflowId();
- String workflowUser = context.getWorkflowUser();
- String nominalTime = context.getNominalTimeAsISO8601();
- String runId = String.valueOf(context.getWorkflowRunId());
- Date now = new Date();
- // Start and/or End time may not be set in case of workflow suspend
- Date endTime;
- if (context.getWorkflowEndTime() == 0) {
- endTime = now;
- } else {
- endTime = new Date(context.getWorkflowEndTime());
- }
-
- Date startTime;
- if (context.getWorkflowStartTime() == 0) {
- startTime = now;
- } else {
- startTime = new Date(context.getWorkflowStartTime());
- }
- Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
- if (context.hasWorkflowFailed()) {
- GenericAlert.instrumentFailedInstance(clusterName, entityType,
- entityName, nominalTime, workflowId, workflowUser, runId, operation,
- SchemaHelper.formatDateUTC(startTime), "", "", duration);
- } else {
- GenericAlert.instrumentSucceededInstance(clusterName, entityType,
- entityName, nominalTime, workflowId, workflowUser, runId, operation,
- SchemaHelper.formatDateUTC(startTime), duration);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
deleted file mode 100644
index 4d8402a..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.engine;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.LifeCycle;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.cluster.Cluster;
-import org.apache.falcon.resource.InstancesResult;
-import org.apache.falcon.resource.InstancesSummaryResult;
-
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-
-/**
- * Workflow engine should minimally support the
- * following operations.
- */
-public abstract class AbstractWorkflowEngine {
-
- public static final String NAME_NODE = "nameNode";
- public static final String JOB_TRACKER = "jobTracker";
-
- protected Set<WorkflowEngineActionListener> listeners = new HashSet<WorkflowEngineActionListener>();
-
- public void registerListener(WorkflowEngineActionListener listener) {
- listeners.add(listener);
- }
-
- public abstract boolean isAlive(Cluster cluster) throws FalconException;
-
- public abstract void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties)
- throws FalconException;
-
- public abstract String suspend(Entity entity) throws FalconException;
-
- public abstract String resume(Entity entity) throws FalconException;
-
- public abstract String delete(Entity entity) throws FalconException;
-
- public abstract String delete(Entity entity, String cluster) throws FalconException;
-
- public abstract String reRun(String cluster, String wfId, Properties props, boolean isForced)
- throws FalconException;
-
- public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException;
-
- public abstract boolean isActive(Entity entity) throws FalconException;
-
- public abstract boolean isSuspended(Entity entity) throws FalconException;
-
- public abstract boolean isCompleted(Entity entity) throws FalconException;
-
- public abstract InstancesResult getRunningInstances(Entity entity,
- List<LifeCycle> lifeCycles) throws FalconException;
-
- public abstract InstancesResult killInstances(Entity entity, Date start, Date end, Properties props,
- List<LifeCycle> lifeCycles) throws FalconException;
-
- public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props,
- List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException;
-
- public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props,
- List<LifeCycle> lifeCycles) throws FalconException;
-
- public abstract InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props,
- List<LifeCycle> lifeCycles) throws FalconException;
-
- public abstract InstancesResult getStatus(Entity entity, Date start, Date end,
- List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException;
-
- public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end,
- List<LifeCycle> lifeCycles) throws FalconException;
-
- public abstract String update(Entity oldEntity, Entity newEntity,
- String cluster, Boolean skipDryRun) throws FalconException;
-
- public abstract String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException;
-
- public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException;
-
- public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException;
-
- public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException;
-
- public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end,
- List<LifeCycle> lifeCycles) throws FalconException;
-
- public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException;
-
- public abstract Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException;
-
-
- /**
- * Returns the short name of the Workflow Engine.
- * @return
- */
- public abstract String getName();
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java b/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java
deleted file mode 100644
index 2a1cbd4..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.engine;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
-
-/**
- * Listener that will be notified before and after
- * workflow life cycle operations are performed.
- */
-public interface WorkflowEngineActionListener {
-
- void beforeSchedule(Entity entity, String cluster) throws FalconException;
-
- void afterSchedule(Entity entity, String cluster) throws FalconException;
-
- void beforeDelete(Entity entity, String cluster) throws FalconException;
-
- void afterDelete(Entity entity, String cluster) throws FalconException;
-
- void beforeSuspend(Entity entity, String cluster) throws FalconException;
-
- void afterSuspend(Entity entity, String cluster) throws FalconException;
-
- void beforeResume(Entity entity, String cluster) throws FalconException;
-
- void afterResume(Entity entity, String cluster) throws FalconException;
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
deleted file mode 100644
index 3f07c3c..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Shell;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
-
-/**
- * Utility to read oozie action conf at oozie.action.conf.xml.
- */
-public final class OozieActionConfigurationHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(OozieActionConfigurationHelper.class);
-
- private OozieActionConfigurationHelper() {
- }
-
- public static Configuration createActionConf() throws IOException {
- Configuration conf = new Configuration();
- Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml"));
-
- final boolean actionConfExists = confPath.getFileSystem(conf).exists(confPath);
- LOG.info("Oozie Action conf {} found ? {}", confPath, actionConfExists);
- if (actionConfExists) {
- LOG.info("Oozie Action conf found, adding path={}, conf={}", confPath, conf.toString());
- conf.addResource(confPath);
- dumpConf(conf, "oozie action conf ");
- }
-
- String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION");
- if (tokenFile != null) {
- if (Shell.WINDOWS) {
- if (tokenFile.charAt(0) == '"') {
- tokenFile = tokenFile.substring(1);
- }
- if (tokenFile.charAt(tokenFile.length() - 1) == '"') {
- tokenFile = tokenFile.substring(0, tokenFile.length() - 1);
- }
- }
-
- conf.set("mapreduce.job.credentials.binary", tokenFile);
- System.setProperty("mapreduce.job.credentials.binary", tokenFile);
- conf.set("tez.credentials.path", tokenFile);
- System.setProperty("tez.credentials.path", tokenFile);
- }
-
- conf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
- conf.setBoolean("hive.exec.mode.local.auto", false);
-
- return conf;
- }
-
- public static void dumpConf(Configuration conf, String message) throws IOException {
- StringWriter writer = new StringWriter();
- Configuration.dumpConfiguration(conf, writer);
- LOG.info(message + " {}", writer);
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
deleted file mode 100644
index 05f248e..0000000
--- a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.workflow.util;
-
-/**
- * Oozie Constants used across multiple modules.
- */
-public final class OozieConstants {
- /**
- * Constant for the oozie running in local.
- */
- public static final String LOCAL_OOZIE = "localoozie";
-
- private OozieConstants() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
deleted file mode 100644
index 75c8267..0000000
--- a/common/src/main/resources/log4j.xml
+++ /dev/null
@@ -1,86 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
-
-<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
- <appender name="console" class="org.apache.log4j.ConsoleAppender">
- <param name="Target" value="System.out"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
- </layout>
- </appender>
-
- <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/target/logs/application.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
- </layout>
- </appender>
-
- <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/target/logs/audit.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %x %m%n"/>
- </layout>
- </appender>
-
- <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${user.dir}/target/logs/metric.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %m%n"/>
- </layout>
- </appender>
-
- <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %m%n"/>
- </layout>
- </appender>
-
- <logger name="org.apache.falcon" additivity="false">
- <level value="debug"/>
- <appender-ref ref="FILE"/>
- </logger>
-
- <logger name="AUDIT">
- <level value="info"/>
- <appender-ref ref="AUDIT"/>
- </logger>
-
- <logger name="METRIC">
- <level value="info"/>
- <appender-ref ref="METRIC"/>
- </logger>
-
- <root>
- <priority value="info"/>
- <appender-ref ref="console"/>
- </root>
-
-</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/runtime.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties
deleted file mode 100644
index 643559e..0000000
--- a/common/src/main/resources/runtime.properties
+++ /dev/null
@@ -1,54 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-*.domain=debug
-
-*.falcon.parentworkflow.retry.max=3
-*.falcon.parentworkflow.retry.interval.secs=1
-
-*.falcon.replication.workflow.maxmaps=5
-*.falcon.replication.workflow.mapbandwidth=100
-*.webservices.default.results.per.page=10
-
-# If true, do not run retention past feedCluster validity end time.
-# This will retain recent instances beyond feedCluster validity end time.
-*.falcon.retention.keep.instances.beyond.validity=true
-
-# Default configs to handle replication for late arriving feeds.
-*.feed.late.allowed=true
-*.feed.late.frequency=hours(3)
-*.feed.late.policy=exp-backoff
-
-# If true, Falcon skips oozie dryrun while scheduling entities.
-*.falcon.skip.dryrun=false
-
-######### Proxyuser Configuration Start #########
-
-#List of hosts the '#USER#' user is allowed to perform 'doAs 'operations from. The '#USER#' must be replaced with the
-#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of
-#comma separated hostnames
-
-*.falcon.service.ProxyUserService.proxyuser.#USER#.hosts=*
-
-#List of groups the '#USER#' user is allowed to 'doAs 'operations. The '#USER#' must be replaced with the
-#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of
-#comma separated groups
-
-*.falcon.service.ProxyUserService.proxyuser.#USER#.groups=*
-
-######### Proxyuser Configuration End #########
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/startup.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties
deleted file mode 100644
index 2497cce..0000000
--- a/common/src/main/resources/startup.properties
+++ /dev/null
@@ -1,306 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-*.domain=debug
-
-######### Implementation classes #########
-## DONT MODIFY UNLESS SURE ABOUT CHANGE ##
-
-*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine
-*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory
-*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder
-*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder
-*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager
-*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService
-*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager
-*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService
-
-##### Falcon Services #####
-*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\
- org.apache.falcon.workflow.WorkflowJobEndNotificationService, \
- org.apache.falcon.service.ProcessSubscriberService,\
- org.apache.falcon.service.FeedSLAMonitoringService,\
- org.apache.falcon.service.LifecyclePolicyMap,\
- org.apache.falcon.entity.store.ConfigurationStore,\
- org.apache.falcon.rerun.service.RetryService,\
- org.apache.falcon.rerun.service.LateRunService,\
- org.apache.falcon.metadata.MetadataMappingService,\
- org.apache.falcon.service.LogCleanupService,\
- org.apache.falcon.service.GroupsService,\
- org.apache.falcon.service.ProxyUserService,\
- org.apache.falcon.adfservice.ADFProviderService
-## If you wish to use Falcon native scheduler add the commented out services below to application.services ##
-# org.apache.falcon.notification.service.impl.JobCompletionService,\
-# org.apache.falcon.notification.service.impl.SchedulerService,\
-# org.apache.falcon.notification.service.impl.AlarmService,\
-# org.apache.falcon.notification.service.impl.DataAvailabilityService,\
-# org.apache.falcon.execution.FalconExecutionService,\
-# org.apache.falcon.state.store.service.FalconJPAService
-
-
-# List of Lifecycle policies configured.
-*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete
-# List of builders for the policies.
-*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder
-##### Falcon Configuration Store Change listeners #####
-*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\
- org.apache.falcon.entity.ColoClusterRelation,\
- org.apache.falcon.group.FeedGroupMap,\
- org.apache.falcon.entity.store.FeedLocationStore,\
- org.apache.falcon.service.FeedSLAMonitoringService,\
- org.apache.falcon.service.SharedLibraryHostingService
-## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ##
-# org.apache.falcon.state.store.jdbc.JdbcStateStore
-
-##### JMS MQ Broker Implementation class #####
-*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory
-
-##### List of shared libraries for Falcon workflows #####
-*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3
-
-##### Workflow Job Execution Completion listeners #####
-*.workflow.execution.listeners=
-
-######### Implementation classes #########
-
-
-######### System startup parameters #########
-
-# Location of libraries that is shipped to Hadoop
-*.system.lib.location=${FALCON_HOME}/sharedlibs
-
-# Location to store user entity configurations
-
-#Configurations used in UTs
-debug.config.store.uri=file://${user.dir}/target/store
-#Location to store state of Feed SLA monitoring service
-debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances
-debug.config.oozie.conf.uri=${user.dir}/target/oozie
-debug.system.lib.location=${system.lib.location}
-debug.broker.url=vm://localhost
-debug.retry.recorder.path=${user.dir}/target/retry
-debug.libext.feed.retention.paths=${falcon.libext}
-debug.libext.feed.replication.paths=${falcon.libext}
-debug.libext.process.paths=${falcon.libext}
-
-#Configurations used in ITs
-it.config.store.uri=file://${user.dir}/target/store
-it.config.oozie.conf.uri=${user.dir}/target/oozie
-it.system.lib.location=${system.lib.location}
-it.broker.url=tcp://localhost:61616
-it.retry.recorder.path=${user.dir}/target/retry
-it.libext.feed.retention.paths=${falcon.libext}
-it.libext.feed.replication.paths=${falcon.libext}
-it.libext.process.paths=${falcon.libext}
-it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler
-
-*.falcon.cleanup.service.frequency=minutes(5)
-
-######### Properties for Feed SLA Monitoring #########
-# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour
-*.feed.sla.serialization.frequency.millis=3600000
-
-# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in
-# a FIFO fashion.
-*.feed.sla.queue.size=288
-
-# Do not change unless really sure
-# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60
-*.feed.sla.statusCheck.frequency.seconds=600
-
-# Do not change unless really sure
-# Time Duration (in milliseconds) in future for generating pending feed instances.
-# In every cycle pending feed instances are added for monitoring, till this time in future.
-# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000
-*.feed.sla.lookAheadWindow.millis=900000
-
-
-######### Properties for configuring JMS provider - activemq #########
-# Default Active MQ url
-*.broker.url=tcp://localhost:61616
-
-# default time-to-live for a JMS message 3 days (time in minutes)
-*.broker.ttlInMins=4320
-*.entity.topic=FALCON.ENTITY.TOPIC
-*.max.retry.failure.count=1
-*.retry.recorder.path=${user.dir}/logs/retry
-
-######### Properties for configuring iMon client and metric #########
-*.internal.queue.size=1000
-
-
-######### Graph Database Properties #########
-# Graph implementation
-*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory
-
-# Graph Storage
-*.falcon.graph.storage.directory=${user.dir}/target/graphdb
-*.falcon.graph.storage.backend=berkeleyje
-*.falcon.graph.serialize.path=${user.dir}/target/graphdb
-*.falcon.graph.preserve.history=false
-*.falcon.graph.transaction.retry.count=3
-*.falcon.graph.transaction.retry.delay=5
-
-# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You
-# can use other reporters like ganglia also.
-# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the
-# relevant configurations for your use case. NOTE: you have to prefix all the properties with "*.falcon.graph."
-# *.falcon.graph.storage.enable-basic-metrics = true
-# Required; IP or hostname string
-# *.falcon.graph.metrics.graphite.hostname = 192.168.0.1
-# Required; specify logging interval in milliseconds
-# *.falcon.graph.metrics.graphite.interval = 60000
-
-######### Authentication Properties #########
-
-# Authentication type must be specified: simple|kerberos
-*.falcon.authentication.type=simple
-
-##### Service Configuration
-
-# Indicates the Kerberos principal to be used in Falcon Service.
-*.falcon.service.authentication.kerberos.principal=
-
-# Location of the keytab file with the credentials for the Service principal.
-*.falcon.service.authentication.kerberos.keytab=
-
-# name node principal to talk to config store
-*.dfs.namenode.kerberos.principal=
-
-##### SPNEGO Configuration
-
-# Authentication type must be specified: simple|kerberos|<class>
-# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility
-*.falcon.http.authentication.type=simple
-
-# Indicates how long (in seconds) an authentication token is valid before it has to be renewed.
-*.falcon.http.authentication.token.validity=36000
-
-# The signature secret for signing the authentication tokens.
-*.falcon.http.authentication.signature.secret=falcon
-
-# The domain to use for the HTTP cookie that stores the authentication token.
-*.falcon.http.authentication.cookie.domain=
-
-# Indicates if anonymous requests are allowed when using 'simple' authentication.
-*.falcon.http.authentication.simple.anonymous.allowed=false
-
-# Indicates the Kerberos principal to be used for HTTP endpoint.
-# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
-*.falcon.http.authentication.kerberos.principal=
-
-# Location of the keytab file with the credentials for the HTTP principal.
-*.falcon.http.authentication.kerberos.keytab=
-
-# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details.
-*.falcon.http.authentication.kerberos.name.rules=DEFAULT
-
-# Comma separated list of black listed users
-*.falcon.http.authentication.blacklisted.users=
-
-######### Authentication Properties #########
-
-
-######### Authorization Properties #########
-
-# Authorization Enabled flag: false (default)|true
-*.falcon.security.authorization.enabled=false
-
-# The name of the group of super-users
-*.falcon.security.authorization.superusergroup=falcon
-
-# Admin Users, comma separated users
-*.falcon.security.authorization.admin.users=falcon,ambari-qa
-
-# Admin Group Membership, comma separated users
-*.falcon.security.authorization.admin.groups=falcon,staff
-
-# Authorization Provider Implementation Fully Qualified Class Name
-*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider
-
-######### Authorization Properties #########
-
-######### ADF Configurations start #########
-
-# A String object that represents the namespace
-*.microsoft.windowsazure.services.servicebus.namespace=
-
-# Request and status queues on the namespace
-*.microsoft.windowsazure.services.servicebus.requestqueuename=
-*.microsoft.windowsazure.services.servicebus.statusqueuename=
-
-# A String object that contains the SAS key name
-*.microsoft.windowsazure.services.servicebus.sasKeyName=
-
-# A String object that contains the SAS key
-*.microsoft.windowsazure.services.servicebus.sasKey=
-
-# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect
-# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net"
-*.microsoft.windowsazure.services.servicebus.serviceBusRootUri=
-
-# Service bus polling frequency
-*.microsoft.windowsazure.services.servicebus.polling.frequency=
-
-# Super user
-*.microsoft.windowsazure.services.servicebus.superuser=
-
-######### ADF Configurations end ###########
-
-######### SMTP Properties ########
-
-# Setting SMTP hostname
-#*.falcon.email.smtp.host=localhost
-
-# Setting SMTP port number
-#*.falcon.email.smtp.port=25
-
-# Setting email from address
-#*.falcon.email.from.address=falcon@localhost
-
-# Setting email Auth
-#*.falcon.email.smtp.auth=false
-
-#Setting user name
-#*.falcon.email.smtp.user=""
-
-#Setting password
-#*.falcon.email.smtp.password=""
-
-# Setting monitoring plugin, if SMTP parameters is defined
-#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\
-# org.apache.falcon.plugin.EmailNotificationPlugin
-
-######### StateStore Properties #####
-#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
-#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
-#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
-#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
-## Maximum number of active connections that can be allocated from this pool at the same time.
-#*.falcon.statestore.pool.max.active.conn=10
-#*.falcon.statestore.connection.properties=
-## Indicates the interval (in milliseconds) between eviction runs.
-#*.falcon.statestore.validate.db.connection.eviction.interval=300000
-## The number of objects to examine during each run of the idle object evictor thread.
-#*.falcon.statestore.validate.db.connection.eviction.num=10
-## Creates Falcon DB.
-## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
-## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/statestore.credentials
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials
deleted file mode 100644
index 86c32a1..0000000
--- a/common/src/main/resources/statestore.credentials
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-######### StateStore Credentials #####
-#*.falcon.statestore.jdbc.username=sa
-#*.falcon.statestore.jdbc.password=
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/statestore.properties
----------------------------------------------------------------------
diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties
deleted file mode 100644
index 44e79b3..0000000
--- a/common/src/main/resources/statestore.properties
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-*.domain=debug
-
-######### StateStore Properties #####
-#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore
-#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver
-## Falcon currently supports derby, mysql and postgreSQL, change url based on DB.
-#*.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db;create=true
-
-## StateStore credentials file where username,password and other properties can be stored securely.
-## Set this credentials file permission 400 and make sure user who starts falcon should only have read permission.
-## Give Absolute path to credentials file along with file name or put in classpath with filename statestore.credentials.
-## Credentials file should be present either in given location or class path, otherwise falcon won't start.
-#*.falcon.statestore.credentials.file=
-
-#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource
-## Maximum number of active connections that can be allocated from this pool at the same time.
-#*.falcon.statestore.pool.max.active.conn=10
-## Any additional connection properties that need to be used, specified as comma separated key=value pairs.
-#*.falcon.statestore.connection.properties=
-## Indicates the interval (in milliseconds) between eviction runs.
-#*.falcon.statestore.validate.db.connection.eviction.interval=300000
-## The number of objects to examine during each run of the idle object evictor thread.
-#*.falcon.statestore.validate.db.connection.eviction.num=10
-## Creates Falcon DB.
-## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP.
-## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up.
-#*.falcon.statestore.create.db.schema=true
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
deleted file mode 100644
index 0df59b2..0000000
--- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.cleanup;
-
-import org.apache.falcon.FalconException;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
-import org.apache.falcon.entity.AbstractTestBase;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.security.CurrentUser;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Test for log cleanup service.
- */
-public class LogCleanupServiceTest extends AbstractTestBase {
-
- private FileSystem fs;
- private FileSystem tfs;
- private EmbeddedCluster targetDfsCluster;
-
- private final Path instanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/process/"
- + "sample" + "/logs/job-2010-01-01-01-00/000");
- private final Path instanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/process/"
- + "sample" + "/logs/job-2010-01-01-01-00/001");
- private final Path instanceLogPath2 = new Path("/projects/falcon/staging/falcon/workflows/process/"
- + "sample" + "/logs/job-2010-01-01-02-00/001");
- private final Path instanceLogPath3 = new Path("/projects/falcon/staging/falcon/workflows/process/"
- + "sample2" + "/logs/job-2010-01-01-01-00/000");
- private final Path instanceLogPath4 = new Path("/projects/falcon/staging/falcon/workflows/process/"
- + "sample" + "/logs/latedata/2010-01-01-01-00");
- private final Path instanceLogPath5 = new Path("/projects/falcon/staging/falcon/workflows/process/"
- + "sample3" + "/logs/job-2010-01-01-01-00/000");
- private final Path feedInstanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/feed/"
- + "impressionFeed" + "/logs/job-2010-01-01-01-00/testCluster/000");
- private final Path feedInstanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/feed/"
- + "impressionFeed2" + "/logs/job-2010-01-01-01-00/testCluster/000");
-
-
- @AfterClass
- public void tearDown() {
- this.dfsCluster.shutdown();
- this.targetDfsCluster.shutdown();
- }
-
- @Override
- @BeforeClass
- public void setup() throws Exception {
- this.dfsCluster = EmbeddedCluster.newCluster("testCluster", CurrentUser.getUser());
- conf = dfsCluster.getConf();
- fs = dfsCluster.getFileSystem();
- fs.delete(new Path("/"), true);
-
- storeEntity(EntityType.CLUSTER, "testCluster");
- System.setProperty("test.build.data", "target/tdfs/data" + System.currentTimeMillis());
- this.targetDfsCluster = EmbeddedCluster.newCluster("backupCluster");
- conf = targetDfsCluster.getConf();
-
- storeEntity(EntityType.CLUSTER, "backupCluster");
- storeEntity(EntityType.FEED, "impressionFeed");
- storeEntity(EntityType.FEED, "clicksFeed");
- storeEntity(EntityType.FEED, "imp-click-join1");
- storeEntity(EntityType.FEED, "imp-click-join2");
- storeEntity(EntityType.PROCESS, "sample");
- Process process = ConfigurationStore.get().get(EntityType.PROCESS, "sample");
- Process otherProcess = (Process) process.copy();
- otherProcess.setName("sample2");
- otherProcess.setFrequency(new Frequency("days(1)"));
- Process noACLProcess = (Process) process.copy();
- noACLProcess.setName("sample3");
- noACLProcess.setACL(null);
- ConfigurationStore.get().remove(EntityType.PROCESS,
- otherProcess.getName());
- ConfigurationStore.get().publish(EntityType.PROCESS, otherProcess);
- ConfigurationStore.get().remove(EntityType.PROCESS,
- noACLProcess.getName());
- ConfigurationStore.get().publish(EntityType.PROCESS, noACLProcess);
-
- fs.mkdirs(instanceLogPath);
- fs.mkdirs(instanceLogPath1);
- fs.mkdirs(instanceLogPath2);
- fs.mkdirs(instanceLogPath3);
- fs.mkdirs(instanceLogPath4);
- fs.mkdirs(instanceLogPath5);
-
- // fs.setTimes wont work on dirs
- fs.createNewFile(new Path(instanceLogPath, "oozie.log"));
- fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log"));
-
- tfs = targetDfsCluster.getFileSystem();
- tfs.delete(new Path("/"), true);
- fs.mkdirs(feedInstanceLogPath);
- fs.mkdirs(feedInstanceLogPath1);
- tfs.mkdirs(feedInstanceLogPath);
- tfs.mkdirs(feedInstanceLogPath1);
- fs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
- tfs.createNewFile(new Path(feedInstanceLogPath, "oozie.log"));
-
- // table feed staging dir setup
- initializeStagingDirs();
- Thread.sleep(1000);
- }
-
- private void initializeStagingDirs() throws Exception {
- final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml");
- Feed tableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream);
- getStore().publish(EntityType.FEED, tableFeed);
- }
-
- @Test
- public void testProcessLogs() throws IOException, FalconException, InterruptedException {
-
- Assert.assertTrue(fs.exists(instanceLogPath));
- Assert.assertTrue(fs.exists(instanceLogPath1));
- Assert.assertTrue(fs.exists(instanceLogPath2));
- Assert.assertTrue(fs.exists(instanceLogPath3));
-
- AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
- processCleanupHandler.cleanup();
-
- Assert.assertFalse(fs.exists(instanceLogPath));
- Assert.assertFalse(fs.exists(instanceLogPath1));
- Assert.assertFalse(fs.exists(instanceLogPath2));
- Assert.assertFalse(fs.exists(instanceLogPath5));
- Assert.assertTrue(fs.exists(instanceLogPath3));
- }
-
- @Test
- public void testFeedLogs() throws IOException, FalconException, InterruptedException {
-
- Assert.assertTrue(fs.exists(feedInstanceLogPath));
- Assert.assertTrue(tfs.exists(feedInstanceLogPath));
- Assert.assertTrue(fs.exists(feedInstanceLogPath1));
- Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
-
- AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
- feedCleanupHandler.cleanup();
-
- Assert.assertFalse(fs.exists(feedInstanceLogPath));
- Assert.assertFalse(tfs.exists(feedInstanceLogPath));
- Assert.assertTrue(fs.exists(feedInstanceLogPath1));
- Assert.assertTrue(tfs.exists(feedInstanceLogPath1));
- }
-}