You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ar...@apache.org on 2014/08/21 19:30:49 UTC
[07/18] git commit: FALCON-583. Post processing is broken in current
trunk. Contributed by Venkatesh Seetharam
FALCON-583. Post processing is broken in current trunk. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/07397774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/07397774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/07397774
Branch: refs/heads/FALCON-585
Commit: 0739777413ac8487a76b9983aa23ecb6cf42104d
Parents: a05a288
Author: Suhas V <su...@inmobi.com>
Authored: Tue Aug 19 12:38:29 2014 +0530
Committer: Suhas V <su...@inmobi.com>
Committed: Tue Aug 19 12:38:29 2014 +0530
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/falcon/entity/FeedHelper.java | 5 +-
.../workflow/WorkflowExecutionContext.java | 8 +-
.../WorkflowJobEndNotificationService.java | 18 ++++-
.../workflow/WorkflowExecutionContextTest.java | 2 +-
.../falcon/messaging/JMSMessageProducer.java | 12 ---
.../falcon/messaging/FeedProducerTest.java | 4 +-
.../messaging/JMSMessageConsumerTest.java | 2 +-
.../messaging/JMSMessageProducerTest.java | 4 +-
.../falcon/messaging/ProcessProducerTest.java | 4 +-
.../falcon/oozie/OozieCoordinatorBuilder.java | 79 +++++++++++++-------
.../OozieOrchestrationWorkflowBuilder.java | 23 ++++--
.../feed/FeedReplicationCoordinatorBuilder.java | 14 +++-
.../feed/FeedReplicationWorkflowBuilder.java | 13 +---
.../feed/FeedRetentionCoordinatorBuilder.java | 12 ++-
.../feed/FeedRetentionWorkflowBuilder.java | 5 +-
.../ProcessExecutionCoordinatorBuilder.java | 9 ++-
.../ProcessExecutionWorkflowBuilder.java | 8 +-
.../src/main/resources/action/post-process.xml | 6 +-
.../feed/OozieFeedWorkflowBuilderTest.java | 69 ++++++++---------
.../falcon/oozie/process/AbstractTestBase.java | 51 +++++++++++++
.../OozieProcessWorkflowBuilderTest.java | 74 ++++++++----------
.../workflow/FalconPostProcessingTest.java | 4 +-
.../falcon/rerun/handler/LateRerunHandler.java | 2 +
24 files changed, 260 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b4dfdc4..0fc3608 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -58,6 +58,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-583 Post processing is broken in current trunk
+ (Venkatesh Seetharam via Suhas Vasu)
+
FALCON-582 Latest changes to LICENSE files results in build failure
(Srikanth Sundarrajan via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
index d09a12f..323188d 100644
--- a/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/FeedHelper.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Property;
@@ -275,9 +276,9 @@ public final class FeedHelper {
+ storage.getTable();
}
- public static Properties getUserWorkflowProperties(String policy) {
+ public static Properties getUserWorkflowProperties(LifeCycle lifeCycle) {
Properties props = new Properties();
- props.put("userWorkflowName", policy + "-policy");
+ props.put("userWorkflowName", lifeCycle.name().toLowerCase() + "-policy");
props.put("userWorkflowEngine", "falcon");
String version;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 637cc3e..786e94f 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -50,7 +50,7 @@ public class WorkflowExecutionContext {
private static final Logger LOG = LoggerFactory.getLogger(WorkflowExecutionContext.class);
- public static final String PROCESS_INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
+ public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time
public static final String OUTPUT_FEED_SEPARATOR = ",";
public static final String INPUT_FEED_SEPARATOR = "#";
@@ -145,10 +145,10 @@ public class WorkflowExecutionContext {
* @return a ISO8601 formatted string
*/
public String getNominalTimeAsISO8601() {
- return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), PROCESS_INSTANCE_FORMAT);
+ return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), INSTANCE_FORMAT);
}
- public String getTimestamp() {
+ String getTimestamp() {
return getValue(WorkflowExecutionArgs.TIMESTAMP);
}
@@ -157,7 +157,7 @@ public class WorkflowExecutionContext {
* @return a ISO8601 formatted string
*/
public String getTimeStampAsISO8601() {
- return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), PROCESS_INSTANCE_FORMAT);
+ return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), INSTANCE_FORMAT);
}
public String getClusterName() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index 39237f9..67f6c79 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -27,6 +27,8 @@ import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.LinkedHashSet;
@@ -37,6 +39,8 @@ import java.util.Set;
*/
public class WorkflowJobEndNotificationService implements FalconService {
+ private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class);
+
public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName();
private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>();
@@ -75,7 +79,12 @@ public class WorkflowJobEndNotificationService implements FalconService {
public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
for (WorkflowExecutionListener listener : listeners) {
- listener.onFailure(context);
+ try {
+ listener.onFailure(context);
+ } catch (Throwable t) {
+ // do not rethrow as other listeners do not get a chance
+ LOG.error("Error in listener {}", listener.getClass().getName(), t);
+ }
}
instrumentAlert(context);
@@ -83,7 +92,12 @@ public class WorkflowJobEndNotificationService implements FalconService {
public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
for (WorkflowExecutionListener listener : listeners) {
- listener.onSuccess(context);
+ try {
+ listener.onSuccess(context);
+ } catch (Throwable t) {
+ // do not rethrow as other listeners do not get a chance
+ LOG.error("Error in listener {}", listener.getClass().getName(), t);
+ }
}
instrumentAlert(context);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
index 93d8831..e97175e 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowExecutionContextTest.java
@@ -52,7 +52,7 @@ public class WorkflowExecutionContextTest {
private static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
private static final String ISO8601_TIME = SchemaHelper.formatDateUTCToISO8601(
- NOMINAL_TIME, WorkflowExecutionContext.PROCESS_INSTANCE_FORMAT);
+ NOMINAL_TIME, WorkflowExecutionContext.INSTANCE_FORMAT);
private WorkflowExecutionContext context;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index aeadf5c..39d6fab 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,7 +18,6 @@
package org.apache.falcon.messaging;
-import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
@@ -213,7 +212,6 @@ public class JMSMessageProducer {
}
change(message, WorkflowExecutionArgs.FEED_INSTANCE_PATHS, feedPaths[i]);
- convertDateFormat(message);
messages.add(message);
}
@@ -320,16 +318,6 @@ public class JMSMessageProducer {
return message;
}
- public void convertDateFormat(Map<String, String> message) {
- String date = message.get(WorkflowExecutionArgs.NOMINAL_TIME.getName());
- change(message, WorkflowExecutionArgs.NOMINAL_TIME,
- SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm"));
-
- date = message.get(WorkflowExecutionArgs.TIMESTAMP.getName());
- change(message, WorkflowExecutionArgs.TIMESTAMP,
- SchemaHelper.formatDateUTCToISO8601(date, "yyyy-MM-dd-HH-mm"));
- }
-
@SuppressWarnings("unchecked")
private Connection createAndStartConnection(String implementation, String userName,
String password, String url)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
index 9119624..1c10be5 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/FeedProducerTest.java
@@ -219,9 +219,9 @@ public class FeedProducerTest {
"falcon");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
- "2011-01-01T01:00Z");
+ "2011-01-01-01-00");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
- "2012-01-01T01:00Z");
+ "2012-01-01-01-00");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 694d488..b1f8271 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -86,7 +86,7 @@ public class JMSMessageConsumerTest {
}
WorkflowExecutionContext context = WorkflowExecutionContext.create(
- getMockFalconMessage(15), WorkflowExecutionContext.Type.POST_PROCESSING);
+ getMockFalconMessage(5), WorkflowExecutionContext.Type.POST_PROCESSING);
context.serialize(WorkflowExecutionContext.getFilePath("/tmp/log", "process1"));
MapMessage mapMessage = session.createMapMessage();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
index 90efa3e..34cff77 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageProducerTest.java
@@ -210,9 +210,9 @@ public class JMSMessageProducerTest {
Assert.assertEquals(message.getString(WorkflowExecutionArgs.ENTITY_NAME.getName()),
"agg-coord");
Assert.assertEquals(message.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
- "2011-01-01T01:00Z");
+ "2011-01-01-01-00");
Assert.assertEquals(message.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
- "2012-01-01T01:00Z");
+ "2012-01-01-01-00");
Assert.assertEquals(message.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
index 95a3780..ccb47df 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/ProcessProducerTest.java
@@ -160,9 +160,9 @@ public class ProcessProducerTest {
"falcon");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.RUN_ID.getName()), "1");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()),
- "2011-01-01T01:00Z");
+ "2011-01-01-01-00");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()),
- "2012-01-01T01:00Z");
+ "2012-01-01-01-00");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
index 459307c..fe2136b 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieCoordinatorBuilder.java
@@ -19,6 +19,7 @@
package org.apache.falcon.oozie;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
@@ -36,6 +37,7 @@ import org.apache.falcon.oozie.process.ProcessExecutionCoordinatorBuilder;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.OozieClient;
@@ -55,11 +57,19 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
protected static final String MR_JOB_PRIORITY = "jobPriority";
protected static final String IGNORE = "IGNORE";
- protected final Tag lifecycle;
+ protected final LifeCycle lifecycle;
- public OozieCoordinatorBuilder(T entity, Tag tag) {
+ public OozieCoordinatorBuilder(T entity, LifeCycle lifecycle) {
super(entity);
- this.lifecycle = tag;
+ this.lifecycle = lifecycle;
+ }
+
+ public LifeCycle getLifecycle() {
+ return lifecycle;
+ }
+
+ public Tag getTag() {
+ return lifecycle.getTag();
}
public static OozieCoordinatorBuilder get(Entity entity, Tag tag) {
@@ -87,11 +97,11 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
}
protected Path getBuildPath(Path buildPath) {
- return new Path(buildPath, lifecycle.name());
+ return new Path(buildPath, getTag().name());
}
protected String getEntityName() {
- return EntityUtil.getWorkflowName(lifecycle, entity).toString();
+ return EntityUtil.getWorkflowName(getTag(), entity).toString();
}
protected Path marshal(Cluster cluster, COORDINATORAPP coord,
@@ -104,26 +114,32 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
String coordName) throws FalconException {
Properties props = new Properties();
props.put(WorkflowExecutionArgs.ENTITY_NAME.getName(), entity.getName());
+ props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
+ props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
props.put(WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME_EL);
props.put(WorkflowExecutionArgs.TIMESTAMP.getName(), ACTUAL_TIME_EL);
- props.put("userBrokerUrl", ClusterHelper.getMessageBrokerUrl(cluster));
- props.put("userBrokerImplClass", ClusterHelper.getMessageBrokerImplClass(cluster));
- String falconBrokerUrl = StartupProperties.get().getProperty(
- WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true");
- props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
- String falconBrokerImplClass = StartupProperties.get().getProperty(
- WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
- props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
- String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
- DEFAULT_BROKER_MSG_TTL.toString());
- props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
- props.put(WorkflowExecutionArgs.ENTITY_TYPE.getName(), entity.getEntityType().name());
- props.put("logDir", getLogDirectory(cluster));
+ props.put("falconDataOperation", getOperation().name());
+
+ props.put(WorkflowExecutionArgs.LOG_DIR.getName(), getLogDirectory(cluster));
props.put(OozieClient.EXTERNAL_ID,
new ExternalId(entity.getName(), EntityUtil.getWorkflowNameTag(coordName, entity),
"${coord:nominalTime()}").getId());
- props.put("workflowEngineUrl", ClusterHelper.getOozieUrl(cluster));
+ props.put(WorkflowExecutionArgs.WF_ENGINE_URL.getName(), ClusterHelper.getOozieUrl(cluster));
+
+ addLateDataProperties(props);
+ addBrokerProperties(cluster, props);
+ props.put(MR_QUEUE_NAME, "default");
+ props.put(MR_JOB_PRIORITY, "NORMAL");
+
+ //props in entity override the set props.
+ props.putAll(getEntityProperties(entity));
+ return props;
+ }
+
+ protected abstract WorkflowExecutionContext.EntityOperations getOperation();
+
+ private void addLateDataProperties(Properties props) throws FalconException {
if (EntityUtil.getLateProcess(entity) == null
|| EntityUtil.getLateProcess(entity).getLateInputs() == null
|| EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
@@ -131,16 +147,25 @@ public abstract class OozieCoordinatorBuilder<T extends Entity> extends OozieEnt
} else {
props.put("shouldRecord", "true");
}
+ }
- props.put("entityName", entity.getName());
- props.put("entityType", entity.getEntityType().name().toLowerCase());
- props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), cluster.getName());
+ private void addBrokerProperties(Cluster cluster, Properties props) {
+ props.put(WorkflowExecutionArgs.USER_BRKR_URL.getName(),
+ ClusterHelper.getMessageBrokerUrl(cluster));
+ props.put(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(),
+ ClusterHelper.getMessageBrokerImplClass(cluster));
- props.put(MR_QUEUE_NAME, "default");
- props.put(MR_JOB_PRIORITY, "NORMAL");
- //props in entity override the set props.
- props.putAll(getEntityProperties(entity));
- return props;
+ String falconBrokerUrl = StartupProperties.get().getProperty(
+ "broker.url", "tcp://localhost:61616?daemon=true");
+ props.put(WorkflowExecutionArgs.BRKR_URL.getName(), falconBrokerUrl);
+
+ String falconBrokerImplClass = StartupProperties.get().getProperty(
+ "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+ props.put(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), falconBrokerImplClass);
+
+ String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+ DEFAULT_BROKER_MSG_TTL.toString());
+ props.put(WorkflowExecutionArgs.BRKR_TTL.getName(), jmsMessageTTL);
}
protected CONFIGURATION getConfig(Properties props) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
index fa645a5..d232aaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieOrchestrationWorkflowBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.oozie;
import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
@@ -79,14 +80,22 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
public static final Set<String> FALCON_ACTIONS = new HashSet<String>(Arrays.asList(
new String[]{PREPROCESS_ACTION_NAME, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME, }));
- private final Tag lifecycle;
+ private final LifeCycle lifecycle;
- public OozieOrchestrationWorkflowBuilder(T entity, Tag lifecycle) {
+ public OozieOrchestrationWorkflowBuilder(T entity, LifeCycle lifecycle) {
super(entity);
this.lifecycle = lifecycle;
}
- public static final OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
+ public LifeCycle getLifecycle() {
+ return lifecycle;
+ }
+
+ public Tag getTag() {
+ return lifecycle.getTag();
+ }
+
+ public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle)
throws FalconException {
switch (entity.getEntityType()) {
case FEED:
@@ -194,11 +203,9 @@ public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extend
}
protected boolean shouldPreProcess() throws FalconException {
- if (EntityUtil.getLateProcess(entity) == null || EntityUtil.getLateProcess(entity).getLateInputs() == null
- || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0) {
- return false;
- }
- return true;
+ return !(EntityUtil.getLateProcess(entity) == null
+ || EntityUtil.getLateProcess(entity).getLateInputs() == null
+ || EntityUtil.getLateProcess(entity).getLateInputs().size() == 0);
}
protected void addLibExtensionsToWorkflow(Cluster cluster, WORKFLOWAPP wf, Tag tag) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index d6115b2..f0864db 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -21,6 +21,7 @@ package org.apache.falcon.oozie.feed;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
@@ -45,6 +46,7 @@ import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.oozie.coordinator.ACTION;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -74,10 +76,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB";
public FeedReplicationCoordinatorBuilder(Feed entity) {
- super(entity, Tag.REPLICATION);
+ super(entity, LifeCycle.REPLICATION);
}
- @Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
+ @Override
+ public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
if (feedCluster.getType() == ClusterType.TARGET) {
List<Properties> props = new ArrayList<Properties>();
@@ -96,6 +99,11 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
return null;
}
+ @Override
+ protected WorkflowExecutionContext.EntityOperations getOperation() {
+ return WorkflowExecutionContext.EntityOperations.REPLICATE;
+ }
+
private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
// Different workflow for each source since hive credentials vary for each cluster
@@ -180,7 +188,7 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
}
propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
- props.putAll(FeedHelper.getUserWorkflowProperties("replication"));
+ props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
workflow.setConfiguration(getConfig(props));
action.setWorkflow(workflow);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
index eafef32..288e9de 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationWorkflowBuilder.java
@@ -19,6 +19,7 @@
package org.apache.falcon.oozie.feed;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -39,7 +40,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
protected static final String REPLICATION_ACTION_NAME = "replication";
public FeedReplicationWorkflowBuilder(Feed entity) {
- super(entity, Tag.REPLICATION);
+ super(entity, LifeCycle.REPLICATION);
}
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
@@ -52,15 +53,7 @@ public abstract class FeedReplicationWorkflowBuilder extends OozieOrchestrationW
addLibExtensionsToWorkflow(cluster, workflow, Tag.REPLICATION);
marshal(cluster, workflow, buildPath);
- Properties props = getProperties(buildPath, wfName);
- props.putAll(getWorkflowProperties());
- return props;
- }
-
- private Properties getWorkflowProperties() {
- Properties props = new Properties();
- props.setProperty("falconDataOperation", "REPLICATE");
- return props;
+ return getProperties(buildPath, wfName);
}
protected abstract WORKFLOWAPP getWorkflow(Cluster src, Cluster target) throws FalconException;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
index 2a67fd3..3c74485 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionCoordinatorBuilder.java
@@ -19,6 +19,7 @@
package org.apache.falcon.oozie.feed;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
@@ -46,7 +47,7 @@ import java.util.Properties;
*/
public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
public FeedRetentionCoordinatorBuilder(Feed entity) {
- super(entity, Tag.RETENTION);
+ super(entity, LifeCycle.EVICTION);
}
@Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
@@ -85,15 +86,13 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
props.put("limit", feedCluster.getRetention().getLimit().toString());
- props.put(WorkflowExecutionArgs.OPERATION.getName(),
- WorkflowExecutionContext.EntityOperations.DELETE.name());
props.put(WorkflowExecutionArgs.FEED_NAMES.getName(), entity.getName());
props.put(WorkflowExecutionArgs.FEED_INSTANCE_PATHS.getName(), IGNORE);
props.put("falconInputFeeds", entity.getName());
props.put("falconInPaths", IGNORE);
- props.putAll(FeedHelper.getUserWorkflowProperties("eviction"));
+ props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
WORKFLOW workflow = new WORKFLOW();
Properties wfProp = OozieOrchestrationWorkflowBuilder.get(entity, cluster, Tag.RETENTION).build(cluster,
@@ -109,4 +108,9 @@ public class FeedRetentionCoordinatorBuilder extends OozieCoordinatorBuilder<Fee
Path marshalPath = marshal(cluster, coord, coordPath);
return Arrays.asList(getProperties(marshalPath, coordName));
}
+
+ @Override
+ protected WorkflowExecutionContext.EntityOperations getOperation() {
+ return WorkflowExecutionContext.EntityOperations.DELETE;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
index 3aabb19..cbe055a 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedRetentionWorkflowBuilder.java
@@ -19,6 +19,7 @@
package org.apache.falcon.oozie.feed;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -39,7 +40,7 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
private static final String EVICTION_ACTION_NAME = "eviction";
public FeedRetentionWorkflowBuilder(Feed entity) {
- super(entity, Tag.DEFAULT);
+ super(entity, LifeCycle.EVICTION);
}
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
@@ -75,7 +76,6 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
private Properties getWorkflowProperties() {
Properties props = new Properties();
- props.setProperty("falconDataOperation", "DELETE");
props.setProperty("srcClusterName", "NA");
return props;
}
@@ -109,5 +109,4 @@ public class FeedRetentionWorkflowBuilder extends OozieOrchestrationWorkflowBuil
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
index d391032..a33fa62 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionCoordinatorBuilder.java
@@ -20,6 +20,7 @@ package org.apache.falcon.oozie.process;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.EntityUtil;
@@ -50,6 +51,7 @@ import org.apache.falcon.oozie.coordinator.OUTPUTEVENTS;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
@@ -64,7 +66,7 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
private static final int THIRTY_MINUTES = 30 * 60 * 1000;
public ProcessExecutionCoordinatorBuilder(Process entity) {
- super(entity, Tag.DEFAULT);
+ super(entity, LifeCycle.EXECUTION);
}
@Override public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
@@ -106,6 +108,11 @@ public class ProcessExecutionCoordinatorBuilder extends OozieCoordinatorBuilder<
return Arrays.asList(getProperties(marshalPath, coordName));
}
+ @Override
+ protected WorkflowExecutionContext.EntityOperations getOperation() {
+ return WorkflowExecutionContext.EntityOperations.GENERATE;
+ }
+
private void initializeCoordAttributes(Cluster cluster, COORDINATORAPP coord, String coordName) {
coord.setName(coordName);
org.apache.falcon.entity.v0.process.Cluster processCluster = ProcessHelper.getCluster(entity,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
index 2eae7ca..865beaf 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/process/ProcessExecutionWorkflowBuilder.java
@@ -19,6 +19,7 @@
package org.apache.falcon.oozie.process;
import org.apache.falcon.FalconException;
+import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
@@ -58,7 +59,7 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
Arrays.asList(new String[]{PREPROCESS_ACTION_NAME, USER_ACTION_NAME, }));
protected ProcessExecutionWorkflowBuilder(Process entity) {
- super(entity, Tag.DEFAULT);
+ super(entity, LifeCycle.EXECUTION);
}
@Override public Properties build(Cluster cluster, Path buildPath) throws FalconException {
@@ -100,14 +101,13 @@ public abstract class ProcessExecutionWorkflowBuilder extends OozieOrchestration
marshal(cluster, wfApp, buildPath);
Properties props = getProperties(buildPath, wfName);
- props.putAll(getWorkflowProperties(cluster));
+ props.putAll(getWorkflowProperties());
return props;
}
- private Properties getWorkflowProperties(Cluster cluster) {
+ private Properties getWorkflowProperties() {
Properties props = new Properties();
- props.setProperty("falconDataOperation", "GENERATE");
props.setProperty("srcClusterName", "NA");
return props;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/main/resources/action/post-process.xml
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/action/post-process.xml b/oozie/src/main/resources/action/post-process.xml
index 1631d63..50cc0d4 100644
--- a/oozie/src/main/resources/action/post-process.xml
+++ b/oozie/src/main/resources/action/post-process.xml
@@ -49,15 +49,15 @@
<arg>-timeStamp</arg>
<arg>${timeStamp}</arg>
<arg>-brokerImplClass</arg>
- <arg>${wf:conf("broker.impl.class")}</arg>
+ <arg>${brokerImplClass}</arg>
<arg>-brokerUrl</arg>
- <arg>${wf:conf("broker.url")}</arg>
+ <arg>${brokerUrl}</arg>
<arg>-userBrokerImplClass</arg>
<arg>${userBrokerImplClass}</arg>
<arg>-userBrokerUrl</arg>
<arg>${userBrokerUrl}</arg>
<arg>-brokerTTL</arg>
- <arg>${wf:conf("broker.ttlInMins")}</arg>
+ <arg>${brokerTTL}</arg>
<arg>-feedNames</arg>
<arg>${feedNames}</arg>
<arg>-feedInstancePaths</arg>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 927aba3..e47895f 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -38,7 +38,6 @@ import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.COORDINATOR;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.process.AbstractTestBase;
@@ -49,6 +48,7 @@ import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -189,10 +189,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
Assert.assertEquals("${now(0,-40)}", outEventInstance);
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
+
+ verifyEntityProperties(feed, trgCluster,
+ WorkflowExecutionContext.EntityOperations.REPLICATE, props);
+ verifyBrokerProperties(trgCluster, props);
// verify the replication param that feed replicator depends on
String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
@@ -208,7 +209,6 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
- Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, feed));
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), feed.getName());
@@ -274,7 +274,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
- assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster.getName(), pathsWithPartitions);
+ assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster, pathsWithPartitions);
List<Properties> betaCoords = builder.buildCoords(betaTrgCluster, new Path("/beta/falcon/"));
final COORDINATORAPP betaCoord = getCoordinator(trgMiniDFS,
@@ -283,7 +283,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
- assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster.getName(), pathsWithPartitions);
+ assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster, pathsWithPartitions);
}
private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
@@ -302,9 +302,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
return parts;
}
- private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, String clusterName,
- String pathsWithPartitions) throws JAXBException, IOException {
- org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(aFeed, clusterName);
+ private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster,
+ String pathsWithPartitions) throws Exception {
+ org.apache.falcon.entity.v0.feed.Cluster feedCluster =
+ FeedHelper.getCluster(aFeed, aCluster.getName());
Date startDate = feedCluster.getValidity().getStart();
Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
@@ -319,10 +320,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
List<String> args = replication.getArg();
Assert.assertEquals(args.size(), 13);
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
@@ -331,7 +329,10 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
Assert.assertEquals(props.get("maxMaps"), "33");
Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
- Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, aFeed));
+
+ verifyEntityProperties(aFeed, aCluster,
+ WorkflowExecutionContext.EntityOperations.REPLICATE, props);
+ verifyBrokerProperties(trgCluster, props);
}
public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
@@ -415,10 +416,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster, tableFeed);
final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster, tableFeed);
@@ -446,14 +444,18 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
- Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
- assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord), wfPath.toString());
+ assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
+ wfPath.toString());
+
+ verifyEntityProperties(tableFeed, trgCluster,
+ WorkflowExecutionContext.EntityOperations.REPLICATE, props);
+ verifyBrokerProperties(trgCluster, props);
}
private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
@@ -525,10 +527,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
String feedDataPath = props.get("feedDataPath");
String storageType = props.get("falconFeedStorageType");
@@ -549,9 +548,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), feed.getName());
Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
- Assert.assertEquals(props.get("logDir"), getLogPath(srcCluster, feed));
assertWorkflowRetries(coord);
+ verifyEntityProperties(feed, srcCluster,
+ WorkflowExecutionContext.EntityOperations.DELETE, props);
+ verifyBrokerProperties(srcCluster, props);
}
@Test (dataProvider = "secureOptions")
@@ -571,10 +572,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName());
Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
String feedDataPath = props.get("feedDataPath");
String storageType = props.get("falconFeedStorageType");
@@ -595,9 +593,11 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
Assert.assertEquals(props.get("feedInstancePaths"), "IGNORE");
- Assert.assertEquals(props.get("logDir"), getLogPath(trgCluster, tableFeed));
assertWorkflowRetries(coord);
+ verifyBrokerProperties(srcCluster, props);
+ verifyEntityProperties(tableFeed, trgCluster,
+ WorkflowExecutionContext.EntityOperations.DELETE, props);
Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
assertHCatCredentials(
@@ -632,9 +632,4 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
}
}
}
-
- private String getLogPath(Cluster aCluster, Feed aFeed) {
- Path logPath = EntityUtil.getLogPath(aCluster, aFeed);
- return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index a0962fc..b547c31 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -21,6 +21,7 @@ package org.apache.falcon.oozie.process;
import org.apache.falcon.FalconException;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -29,9 +30,13 @@ import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
+import org.apache.falcon.oozie.coordinator.CONFIGURATION;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
@@ -48,6 +53,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
/**
@@ -199,4 +205,49 @@ public class AbstractTestBase {
Assert.assertEquals(action.getRetryInterval(), "1");
}
}
+
+ protected HashMap<String, String> getCoordProperties(COORDINATORAPP coord) {
+ HashMap<String, String> props = new HashMap<String, String>();
+ for (CONFIGURATION.Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
+ props.put(prop.getName(), prop.getValue());
+ }
+ return props;
+ }
+
+ protected void verifyEntityProperties(Entity entity, Cluster cluster,
+ WorkflowExecutionContext.EntityOperations operation,
+ HashMap<String, String> props) throws Exception {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
+ entity.getName());
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
+ entity.getEntityType().name());
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
+ Assert.assertEquals(props.get("falconDataOperation"), operation.name());
+ }
+
+ private String getLogPath(Cluster cluster, Entity entity) {
+ Path logPath = EntityUtil.getLogPath(cluster, entity);
+ return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+ }
+
+ protected void verifyBrokerProperties(Cluster cluster, HashMap<String, String> props) {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_URL.getName()),
+ ClusterHelper.getMessageBrokerUrl(cluster));
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName()),
+ ClusterHelper.getMessageBrokerImplClass(cluster));
+
+ String falconBrokerUrl = StartupProperties.get().getProperty(
+ "broker.url", "tcp://localhost:61616?daemon=true");
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_URL.getName()), falconBrokerUrl);
+
+ String falconBrokerImplClass = StartupProperties.get().getProperty(
+ "broker.impl.class", ClusterHelper.DEFAULT_BROKER_IMPL_CLASS);
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName()),
+ falconBrokerImplClass);
+
+ String jmsMessageTTL = StartupProperties.get().getProperty("broker.ttlInMins",
+ String.valueOf(3 * 24 * 60L));
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.BRKR_TTL.getName()), jmsMessageTTL);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
index 45a8732..45badee 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/OozieProcessWorkflowBuilderTest.java
@@ -44,7 +44,6 @@ import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.CONFIGURATION;
-import org.apache.falcon.oozie.coordinator.CONFIGURATION.Property;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.workflow.ACTION;
@@ -55,6 +54,7 @@ import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
+import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -186,19 +186,14 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
assertEquals(ds.getUriTemplate(),
FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
assertEquals(props.get("mapred.job.priority"), "LOW");
- Assert.assertEquals(props.get("logDir"), getLogPath(process));
- assertLibExtensions(fs, coord, EntityType.PROCESS, null);
- }
+ verifyEntityProperties(process, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
- private String getLogPath(Process process) {
- Path logPath = EntityUtil.getLogPath(cluster, process);
- return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;
+ assertLibExtensions(fs, coord, EntityType.PROCESS, null);
}
@Test
@@ -285,10 +280,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
+
+ verifyEntityProperties(process, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
// verify table and hive props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
@@ -298,7 +294,6 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
- Assert.assertEquals(props.get("logDir"), getLogPath(process));
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -349,11 +344,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
- Assert.assertEquals(props.get("logDir"), getLogPath(process));
+ HashMap<String, String> props = getCoordProperties(coord);
+
+ verifyEntityProperties(process, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -404,11 +399,11 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
- Assert.assertEquals(props.get("logDir"), getLogPath(process));
+ HashMap<String, String> props = getCoordProperties(coord);
+
+ verifyEntityProperties(process, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -455,11 +450,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
- Assert.assertEquals(props.get("logDir"), getLogPath(process));
+ HashMap<String, String> props = getCoordProperties(coord);
+ verifyEntityProperties(process, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
@@ -546,10 +540,7 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
+ HashMap<String, String> props = getCoordProperties(coord);
// verify table props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
@@ -558,7 +549,9 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
- Assert.assertEquals(props.get("logDir"), getLogPath(process));
+ verifyEntityProperties(process, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
@@ -684,11 +677,10 @@ public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
- HashMap<String, String> props = new HashMap<String, String>();
- for (Property prop : coord.getAction().getWorkflow().getConfiguration().getProperty()) {
- props.put(prop.getName(), prop.getValue());
- }
- Assert.assertEquals(props.get("logDir"), getLogPath(processEntity));
+ HashMap<String, String> props = getCoordProperties(coord);
+ verifyEntityProperties(processEntity, cluster,
+ WorkflowExecutionContext.EntityOperations.GENERATE, props);
+ verifyBrokerProperties(cluster, props);
String[] expected = {
WorkflowExecutionArgs.FEED_NAMES.getName(),
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index feddfdd..91559a5 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -150,8 +150,8 @@ public class FalconPostProcessingTest {
if (workflowUser != null) { // in case of user message, its NULL
Assert.assertEquals(workflowUser, "falcon");
}
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01T01:00Z");
- Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01T01:00Z");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.NOMINAL_TIME.getName()), "2011-01-01-01-00");
+ Assert.assertEquals(m.getString(WorkflowExecutionArgs.TIMESTAMP.getName()), "2012-01-01-01-00");
Assert.assertEquals(m.getString(WorkflowExecutionArgs.STATUS.getName()), "SUCCEEDED");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/07397774/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
----------------------------------------------------------------------
diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
index 76f0ac0..24c6ec2 100644
--- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
+++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunHandler.java
@@ -37,6 +37,7 @@ import org.apache.falcon.rerun.event.LaterunEvent;
import org.apache.falcon.rerun.policy.AbstractRerunPolicy;
import org.apache.falcon.rerun.policy.RerunPolicyFactory;
import org.apache.falcon.rerun.queue.DelayedQueue;
+import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.conf.Configuration;
@@ -65,6 +66,7 @@ public class LateRerunHandler<M extends DelayedQueue<LaterunEvent>> extends
if (wait == -1) {
LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
+ CurrentUser.authenticate(workflowUser);
java.util.Properties properties =
this.getWfEngine().getWorkflowProperties(cluster, wfId);
String logDir = properties.getProperty("logDir");