You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:55 UTC

[1/4] git commit: FALCON-666 Add Alerts for unrecoverable failures. Contributed by Venkatesh Seetharam

Repository: incubator-falcon
Updated Branches:
  refs/heads/master df6f1d8eb -> 33b420b5d


FALCON-666 Add Alerts for unrecoverable failures. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a94cb7a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a94cb7a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a94cb7a8

Branch: refs/heads/master
Commit: a94cb7a8b7c14f1819785f0ec5e51d8e2478d610
Parents: caa2284
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 14:37:36 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:56 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 common/src/main/resources/log4j.xml             |  9 +++
 .../falcon/aspect/AbstractFalconAspect.java     | 50 ++++++++++------
 .../org/apache/falcon/aspect/AlertMessage.java  | 56 ++++++++++++++++++
 .../org/apache/falcon/aspect/GenericAlert.java  |  7 +--
 .../java/org/apache/falcon/monitors/Alert.java  | 37 ++++++++++++
 .../apache/falcon/plugin/AlertingPlugin.java    | 30 ++++++++++
 .../org/apache/falcon/plugin/LoggingPlugin.java |  9 ++-
 .../falcon/util/ResourcesReflectionUtil.java    | 31 +++++-----
 .../apache/falcon/aspect/AlertMessageTest.java  | 53 +++++++++++++++++
 .../plugin/ChainableMonitoringPlugin.java       | 51 +++++++++++++++--
 .../plugin/ChainableMonitoringPluginTest.java   | 60 ++++++++++++++++++++
 src/conf/log4j.xml                              |  9 +++
 webapp/src/main/resources/log4j.xml             |  9 +++
 14 files changed, 369 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 824bddd..523b218 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,8 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-666 Add Alerts for unrecoverable failures (Venkatesh Seetharam)
+
    FALCON-665 Handle message consumption failures in JMSMessageConsumer
    (Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
index dd1ef70..75c8267 100644
--- a/common/src/main/resources/log4j.xml
+++ b/common/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
         </layout>
     </appender>
 
+    <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <logger name="org.apache.falcon" additivity="false">
         <level value="debug"/>
         <appender-ref ref="FILE"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java b/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
index 29c77ce..06bda3f 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
@@ -38,11 +38,11 @@ public abstract class AbstractFalconAspect {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractFalconAspect.class);
 
     @Around("@annotation(org.apache.falcon.monitors.Monitored)")
-    public Object logAround(ProceedingJoinPoint joinPoint) throws Throwable {
+    public Object logAroundMonitored(ProceedingJoinPoint joinPoint) throws Throwable {
 
         String methodName = joinPoint.getSignature().getName();
         Object[] args = joinPoint.getArgs();
-        Object result = null;
+        Object result;
         ResourceMessage.Status status;
 
         long startTime = System.nanoTime();
@@ -52,8 +52,8 @@ public abstract class AbstractFalconAspect {
         } catch (Exception e) {
             endTime = System.nanoTime();
             status = ResourceMessage.Status.FAILED;
-            publishMessage(getResourceMessage(joinPoint.getSignature()
-                    .getDeclaringType().getSimpleName()
+            publishMessage(getResourceMessage(
+                    joinPoint.getSignature().getDeclaringType().getSimpleName()
                     + "." + methodName, args, status, endTime - startTime));
             throw e;
         }
@@ -65,13 +65,11 @@ public abstract class AbstractFalconAspect {
         return result;
     }
 
-    private ResourceMessage getResourceMessage(String methodName,
-                                               Object[] args, ResourceMessage.Status status, long executionTime) {
-        String action = ResourcesReflectionUtil
-                .getResourceMonitorName(methodName);
+    private ResourceMessage getResourceMessage(String methodName, Object[] args,
+                                               ResourceMessage.Status status, long executionTime) {
+        String action = ResourcesReflectionUtil.getResourceMonitorName(methodName);
 
-        assert action != null : "Method :" + methodName
-                + " not parsed by reflection util";
+        assert action != null : "Method :" + methodName + " not parsed by reflection util";
         Map<String, String> dimensions = new HashMap<String, String>();
 
         if (ResourcesReflectionUtil.getResourceDimensionsName(methodName) == null) {
@@ -79,17 +77,37 @@ public abstract class AbstractFalconAspect {
         } else {
             for (Map.Entry<Integer, String> param : ResourcesReflectionUtil
                     .getResourceDimensionsName(methodName).entrySet()) {
-                dimensions.put(
-                        param.getValue(),
-                        args[param.getKey()] == null ? "NULL" : args[param
-                                .getKey()].toString());
+                dimensions.put(param.getValue(),
+                        args[param.getKey()] == null ? "NULL" : args[param.getKey()].toString());
             }
         }
-        Integer timeTakenArg = ResourcesReflectionUtil
-                .getResourceTimeTakenName(methodName);
+
+        Integer timeTakenArg = ResourcesReflectionUtil.getResourceTimeTakenName(methodName);
         return timeTakenArg == null ? new ResourceMessage(action, dimensions, status, executionTime)
             : new ResourceMessage(action, dimensions, status, Long.valueOf(args[timeTakenArg].toString()));
     }
 
     public abstract void publishMessage(ResourceMessage message);
+
+    @Around("@annotation(org.apache.falcon.monitors.Alert)")
+    public Object logAroundAlert(ProceedingJoinPoint joinPoint) throws Throwable {
+
+        String methodName = joinPoint.getSignature().getName();
+        String event = ResourcesReflectionUtil.getResourceMonitorName(
+                joinPoint.getSignature().getDeclaringType().getSimpleName() + "." + methodName);
+        Object[] args = joinPoint.getArgs();
+        Object result;
+
+        try {
+            result = joinPoint.proceed();
+        } finally {
+            AlertMessage alertMessage = new AlertMessage(
+                    event, args[0].toString(), args[1].toString());
+            publishAlert(alertMessage);
+        }
+
+        return result;
+    }
+
+    public abstract void publishAlert(AlertMessage alertMessage);
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java b/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
new file mode 100644
index 0000000..0f38e34
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.aspect;
+
+/**
+ * Message to be sent to the alerting system.
+ */
+public class AlertMessage {
+
+    private final String event;
+    private final String alert;
+    private final String error;
+
+    public AlertMessage(String event, String alert, String error) {
+        this.event = event;
+        this.alert = alert;
+        this.error = error;
+    }
+
+    public String getEvent() {
+        return event;
+    }
+
+    public String getAlert() {
+        return alert;
+    }
+
+    public String getError() {
+        return error;
+    }
+
+    @Override
+    public String toString() {
+        return "AlertMessage{"
+                + "event='" + event + '\''
+                + ", alert='" + alert + '\''
+                + ", error='" + error + '\''
+                + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index d8efc37..c7a86d9 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.aspect;
 
+import org.apache.falcon.monitors.Alert;
 import org.apache.falcon.monitors.Dimension;
 import org.apache.falcon.monitors.Monitored;
 import org.apache.falcon.monitors.TimeTaken;
@@ -71,7 +72,6 @@ public final class GenericAlert {
             @Dimension(value = "error-message") String errorMessage,
             @Dimension(value = "message") String message,
             @TimeTaken long timeTaken) {
-
         return "IGNORE";
     }
 
@@ -87,7 +87,6 @@ public final class GenericAlert {
             @Dimension(value = "operation") String operation,
             @Dimension(value = "start-time") String startTime,
             @TimeTaken long timeTaken) {
-
         return "IGNORE";
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
@@ -99,14 +98,14 @@ public final class GenericAlert {
         return "IGNORE";
     }
 
-    @Monitored(event = "log-cleanup-service-failed")
+    @Alert(event = "log-cleanup-service-failed")
     public static String alertLogCleanupServiceFailed(
             @Dimension(value = "message") String message,
             @Dimension(value = "exception") Throwable throwable) {
         return "IGNORE";
     }
 
-    @Monitored(event = "jms-message-consumer-failed")
+    @Alert(event = "jms-message-consumer-failed")
     public static String alertJMSMessageConsumerFailed(
             @Dimension(value = "message") String message,
             @Dimension(value = "exception") Throwable throwable) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/monitors/Alert.java b/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
new file mode 100644
index 0000000..03bd387
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.monitors;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Alert annotation for monitoring.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Alert {
+
+    /**
+     * @return Event name associated with this alert
+     */
+    String event();
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java b/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
new file mode 100644
index 0000000..8539ea3
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.plugin;
+
+import org.apache.falcon.aspect.AlertMessage;
+
+/**
+ * Generic interface to receiving alerts.
+ */
+public interface AlertingPlugin {
+
+    void alert(AlertMessage message);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java b/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
index 77d0d30..c959fc2 100644
--- a/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
+++ b/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.plugin;
 
+import org.apache.falcon.aspect.AlertMessage;
 import org.apache.falcon.aspect.ResourceMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,11 +26,17 @@ import org.slf4j.LoggerFactory;
 /**
  * Plugin for logging metrics using log4j.
  */
-public class LoggingPlugin implements MonitoringPlugin {
+public class LoggingPlugin implements MonitoringPlugin, AlertingPlugin {
     private static final Logger METRIC = LoggerFactory.getLogger("METRIC");
+    private static final Logger ALERT = LoggerFactory.getLogger("ALERT");
 
     @Override
     public void monitor(ResourceMessage message) {
         METRIC.info("{}", message);
     }
+
+    @Override
+    public void alert(AlertMessage message) {
+        ALERT.info("{}", message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
index 6e80db8..3bafaee 100644
--- a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
+++ b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.falcon.util;
 
+import org.apache.falcon.monitors.Alert;
 import org.apache.falcon.monitors.Monitored;
 import org.apache.falcon.monitors.TimeTaken;
 
@@ -48,7 +49,8 @@ public final class ResourcesReflectionUtil {
     }
 
     public static Map<Integer, String> getResourceDimensionsName(String methodName) {
-        return METHODS.get(methodName) != null ? Collections.unmodifiableMap(METHODS.get(methodName).params) : null;
+        return METHODS.get(methodName) != null
+                ? Collections.unmodifiableMap(METHODS.get(methodName).params) : null;
     }
 
     public static String getResourceMonitorName(String methodName) {
@@ -56,8 +58,7 @@ public final class ResourcesReflectionUtil {
     }
 
     public static Integer getResourceTimeTakenName(String methodName) {
-        return METHODS.get(methodName) != null ? METHODS.get(methodName).timeTakenArgIndex
-                : null;
+        return METHODS.get(methodName) != null ? METHODS.get(methodName).timeTakenArgIndex : null;
     }
 
     /**
@@ -81,11 +82,11 @@ public final class ResourcesReflectionUtil {
     private static void buildAnnotationsMapForClass(String className) {
         Class clazz;
         try {
-            clazz = ResourcesReflectionUtil.class.
-                    getClassLoader().loadClass(className);
+            clazz = ResourcesReflectionUtil.class.getClassLoader().loadClass(className);
         } catch (ClassNotFoundException e) {
             throw new RuntimeException("Unable to load class " + className, e);
         }
+
         Method[] declMethods = clazz.getMethods();
 
         // scan every method
@@ -93,20 +94,17 @@ public final class ResourcesReflectionUtil {
             Annotation[] methodAnnots = declMethod.getDeclaredAnnotations();
             // scan every annotation on method
             for (Annotation methodAnnot : methodAnnots) {
-                if (methodAnnot.annotationType().getSimpleName()
-                        .equals(Monitored.class.getSimpleName())) {
+                final String simpleName = methodAnnot.annotationType().getSimpleName();
+                if (simpleName.equals(Monitored.class.getSimpleName())
+                        || simpleName.equals(Alert.class.getSimpleName())) {
                     MethodAnnotation annotation = new MethodAnnotation();
-                    annotation.monitoredName = getAnnotationValue(
-                            methodAnnot, "event");
-                    Annotation[][] paramAnnots = declMethod
-                            .getParameterAnnotations();
+                    annotation.monitoredName = getAnnotationValue(methodAnnot, "event");
+                    Annotation[][] paramAnnots = declMethod.getParameterAnnotations();
+
                     // scan every param
                     annotation.params = getDeclaredParamAnnots(paramAnnots, annotation);
-                    METHODS.put(
-                            clazz.getSimpleName() + "."
-                                    + declMethod.getName(), annotation);
+                    METHODS.put(clazz.getSimpleName() + "." + declMethod.getName(), annotation);
                 }
-
             }
         }
     }
@@ -126,8 +124,8 @@ public final class ResourcesReflectionUtil {
                 }
             }
         }
-        return params;
 
+        return params;
     }
 
     private static String getAnnotationValue(Annotation annotation,
@@ -145,5 +143,4 @@ public final class ResourcesReflectionUtil {
 
         return value;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
----------------------------------------------------------------------
diff --git a/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java b/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
new file mode 100644
index 0000000..d53a234
--- /dev/null
+++ b/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.aspect;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Test Message to be sent to the alerting system.
+ */
+public class AlertMessageTest {
+
+    private final AlertMessage alertMessage = new AlertMessage(
+            "event", "alert", "error"
+    );
+
+    @Test
+    public void testGetEvent() throws Exception {
+        Assert.assertEquals(alertMessage.getEvent(), "event");
+    }
+
+    @Test
+    public void testGetAlert() throws Exception {
+        Assert.assertEquals(alertMessage.getAlert(), "alert");
+    }
+
+    @Test
+    public void testGetError() throws Exception {
+        Assert.assertEquals(alertMessage.getError(), "error");
+    }
+
+    @Test
+    public void testToString() throws Exception {
+        Assert.assertEquals(alertMessage.toString(),
+                "AlertMessage{event='event', alert='alert', error='error'}");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
index c695bb7..1a5a331 100644
--- a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
+++ b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
@@ -20,6 +20,7 @@ package org.apache.falcon.plugin;
 
 import org.apache.falcon.FalconException;
 import org.apache.falcon.aspect.AbstractFalconAspect;
+import org.apache.falcon.aspect.AlertMessage;
 import org.apache.falcon.aspect.ResourceMessage;
 import org.apache.falcon.util.ReflectionUtils;
 import org.apache.falcon.util.StartupProperties;
@@ -36,29 +37,51 @@ import java.util.List;
  * of {@link MonitoringPlugin}. {@link LoggingPlugin} is the default.
  */
 @Aspect
-public class ChainableMonitoringPlugin extends AbstractFalconAspect implements MonitoringPlugin {
+public class ChainableMonitoringPlugin extends AbstractFalconAspect
+        implements MonitoringPlugin, AlertingPlugin {
     private static final Logger LOG = LoggerFactory.getLogger(ChainableMonitoringPlugin.class);
 
-    private List<MonitoringPlugin> plugins = new ArrayList<MonitoringPlugin>();
+    private List<MonitoringPlugin> monitoringPlugins = new ArrayList<MonitoringPlugin>();
+    private List<AlertingPlugin> alertingPlugins = new ArrayList<AlertingPlugin>();
 
     public ChainableMonitoringPlugin() {
+        initializeMonitoringPlugins();
+        initializeAlertingPlugins();
+    }
+
+    private void initializeMonitoringPlugins() {
         String pluginClasses = StartupProperties.get().
                 getProperty("monitoring.plugins", LoggingPlugin.class.getName());
         try {
             for (String pluginClass : pluginClasses.split(",")) {
                 MonitoringPlugin plugin = ReflectionUtils.getInstanceByClassName(pluginClass.trim());
-                plugins.add(plugin);
+                monitoringPlugins.add(plugin);
                 LOG.info("Registered Monitoring Plugin {}", pluginClass);
             }
         } catch (FalconException e) {
-            plugins = Arrays.asList((MonitoringPlugin) new LoggingPlugin());
-            LOG.error("Unable to initialize monitoring plugins: {}", pluginClasses, e);
+            monitoringPlugins = Arrays.asList((MonitoringPlugin) new LoggingPlugin());
+            LOG.error("Unable to initialize monitoring.plugins: {}", pluginClasses, e);
+        }
+    }
+
+    private void initializeAlertingPlugins() {
+        String pluginClasses = StartupProperties.get().
+                getProperty("alerting.plugins", LoggingPlugin.class.getName());
+        try {
+            for (String pluginClass : pluginClasses.split(",")) {
+                AlertingPlugin plugin = ReflectionUtils.getInstanceByClassName(pluginClass.trim());
+                alertingPlugins.add(plugin);
+                LOG.info("Registered Alerting Plugin {}", pluginClass);
+            }
+        } catch (FalconException e) {
+            alertingPlugins = Arrays.asList((AlertingPlugin) new LoggingPlugin());
+            LOG.error("Unable to initialize alerting.plugins: {}", pluginClasses, e);
         }
     }
 
     @Override
     public void monitor(ResourceMessage message) {
-        for (MonitoringPlugin plugin : plugins) {
+        for (MonitoringPlugin plugin : monitoringPlugins) {
             try {
                 plugin.monitor(message);
             } catch (Exception e) {
@@ -71,4 +94,20 @@ public class ChainableMonitoringPlugin extends AbstractFalconAspect implements M
     public void publishMessage(ResourceMessage message) {
         monitor(message);
     }
+
+    @Override
+    public void publishAlert(AlertMessage alertMessage) {
+        alert(alertMessage);
+    }
+
+    @Override
+    public void alert(AlertMessage alertMessage) {
+        for (AlertingPlugin plugin : alertingPlugins) {
+            try {
+                plugin.alert(alertMessage);
+            } catch (Exception e) {
+                LOG.debug("Unable to publish message to {}", plugin.getClass(), e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java b/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
new file mode 100644
index 0000000..9a386c6
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.plugin;
+
+import org.apache.falcon.aspect.AlertMessage;
+import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.aspect.ResourceMessage;
+import org.apache.falcon.util.StartupProperties;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test for ChainableMonitoringPlugin.
+ */
+public class ChainableMonitoringPluginTest implements MonitoringPlugin, AlertingPlugin {
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        StartupProperties.get().
+                setProperty("monitoring.plugins", this.getClass().getName());
+        StartupProperties.get().
+                setProperty("alerting.plugins", this.getClass().getName());
+    }
+
+    @Test
+    public void testPlugin() throws Exception {
+        GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df",
+                "ef-id", "wf-user", "1", "DELETE", "now", "error", "none", 1242);
+        GenericAlert.alertJMSMessageConsumerFailed("test-alert", new Exception("test"));
+    }
+
+    @Override
+    public void monitor(ResourceMessage message) {
+        Assert.assertNotNull(message);
+        Assert.assertEquals(message.getAction(), "wf-instance-failed");
+    }
+
+    @Override
+    public void alert(AlertMessage alertMessage) {
+        Assert.assertNotNull(alertMessage);
+        Assert.assertEquals(alertMessage.getEvent(), "jms-message-consumer-failed");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/src/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml
index 90abe26..1341c6e 100644
--- a/src/conf/log4j.xml
+++ b/src/conf/log4j.xml
@@ -51,6 +51,15 @@
         </layout>
     </appender>
 
+    <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${falcon.log.dir}/${falcon.app.type}.security.audit.log"/>
         <param name="Append" value="true"/>

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index 5ba6f16..3f9a191 100644
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
         </layout>
     </appender>
 
+    <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+        <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+        <param name="Append" value="true"/>
+        <param name="Threshold" value="debug"/>
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="%d %m%n"/>
+        </layout>
+    </appender>
+
     <appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
         <param name="File" value="${user.dir}/target/logs/security.audit.log"/>
         <param name="Append" value="true"/>


[2/4] git commit: FALCON-665 Handle message consumption failures in JMSMessageConsumer. Contributed by Venkatesh Seetharam

Posted by ve...@apache.org.
FALCON-665 Handle message consumption failures in JMSMessageConsumer. Contributed by Venkatesh Seetharam


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/caa22842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/caa22842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/caa22842

Branch: refs/heads/master
Commit: caa22842913419e1a3acab292ca7c5396d639d60
Parents: df6f1d8
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 14:35:22 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:56 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../workflow/WorkflowExecutionContext.java      |  5 ++
 .../WorkflowJobEndNotificationService.java      | 25 +++----
 .../WorkflowJobEndNotificationServiceTest.java  | 12 +---
 .../falcon/messaging/JMSMessageConsumer.java    | 75 ++++++++++++++------
 .../messaging/JMSMessageConsumerTest.java       | 16 ++++-
 .../org/apache/falcon/aspect/GenericAlert.java  | 10 +--
 7 files changed, 92 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f638a6..824bddd 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,9 @@ Trunk (Unreleased)
    FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
 
   IMPROVEMENTS
+   FALCON-665 Handle message consumption failures in JMSMessageConsumer
+   (Venkatesh Seetharam)
+
    FALCON-662 Fetch relationships for a given type API (Balu Vellanki via
    Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9c7b395..04ef037 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -285,6 +285,11 @@ public class WorkflowExecutionContext {
         }
     }
 
+    @Override
+    public String toString() {
+        return "WorkflowExecutionContext{" + context.toString() + "}";
+    }
+
     @SuppressWarnings("unchecked")
     public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index fb2d58d..90b66c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -82,7 +82,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
         listeners.remove(listener);
     }
 
-    public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
+    public void notifyFailure(WorkflowExecutionContext context) {
         for (WorkflowExecutionListener listener : listeners) {
             try {
                 listener.onFailure(context);
@@ -95,7 +95,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
         instrumentAlert(context);
     }
 
-    public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
+    public void notifySuccess(WorkflowExecutionContext context) {
         for (WorkflowExecutionListener listener : listeners) {
             try {
                 listener.onSuccess(context);
@@ -108,7 +108,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
         instrumentAlert(context);
     }
 
-    private void instrumentAlert(WorkflowExecutionContext context) throws FalconException {
+    private void instrumentAlert(WorkflowExecutionContext context) {
         String clusterName = context.getClusterName();
         String entityName = context.getEntityName();
         String entityType = context.getEntityType();
@@ -118,14 +118,14 @@ public class WorkflowJobEndNotificationService implements FalconService {
         String nominalTime = context.getNominalTimeAsISO8601();
         String runId = String.valueOf(context.getWorkflowRunId());
 
-        CurrentUser.authenticate(context.getWorkflowUser());
-        AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
-        InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
-        Date startTime = result.getInstances()[0].startTime;
-        Date endTime = result.getInstances()[0].endTime;
-        Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
         try {
+            CurrentUser.authenticate(context.getWorkflowUser());
+            AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+            InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
+            Date startTime = result.getInstances()[0].startTime;
+            Date endTime = result.getInstances()[0].endTime;
+            Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+
             if (context.hasWorkflowFailed()) {
                 GenericAlert.instrumentFailedInstance(clusterName, entityType,
                         entityName, nominalTime, workflowId, workflowUser, runId, operation,
@@ -135,8 +135,9 @@ public class WorkflowJobEndNotificationService implements FalconService {
                         entityName, nominalTime, workflowId, workflowUser, runId, operation,
                         SchemaHelper.formatDateUTC(startTime), duration);
             }
-        } catch (Exception e) {
-            throw new FalconException(e);
+        } catch (FalconException e) {
+            // Logging an error and ignoring since there are listeners for extensions
+            LOG.error("Instrumenting alert failed for: " + context, e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 9a6ad98..b7df443 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -109,19 +109,11 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
     }
 
     private void notifyFailure(WorkflowExecutionContext context) {
-        try {
-            service.notifyFailure(context);
-        } catch (FalconException ignored) {
-            // do nothing
-        }
+        service.notifyFailure(context);
     }
 
     private void notifySuccess(WorkflowExecutionContext context) {
-        try {
-            service.notifySuccess(context);
-        } catch (FalconException ignored) {
-            // do nothing
-        }
+        service.notifySuccess(context);
     }
 
     private static String[] getTestMessageArgs() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index ec7bd93..4a0bc2a 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -48,6 +48,8 @@ import java.util.Map;
 public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     private static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumer.class);
 
+    private static final String FALCON_CLIENT_ID = "falcon-server";
+
     private final String implementation;
     private final String userName;
     private final String password;
@@ -56,7 +58,8 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     private final WorkflowJobEndNotificationService jobEndNotificationService;
 
     private Connection connection;
-    private TopicSubscriber subscriber;
+    private TopicSession topicSession;
+    private TopicSubscriber topicSubscriber;
 
     public JMSMessageConsumer(String implementation, String userName,
                               String password, String url, String topicName,
@@ -72,15 +75,17 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     public void startSubscriber() throws FalconException {
         try {
             connection = createAndGetConnection(implementation, userName, password, url);
-            TopicSession session = (TopicSession) connection.createSession(
-                    false, Session.AUTO_ACKNOWLEDGE);
-            Topic destination = session.createTopic(topicName);
-            subscriber = session.createSubscriber(destination);
-            subscriber.setMessageListener(this);
+            connection.setClientID(FALCON_CLIENT_ID);
+
+            topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic destination = topicSession.createTopic(topicName);
+            topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
+            topicSubscriber.setMessageListener(this);
+
             connection.setExceptionListener(this);
             connection.start();
         } catch (Exception e) {
-            LOG.error("Error starting subscriber of topic: " + this.toString(), e);
+            LOG.error("Error starting topicSubscriber of topic: " + this.toString(), e);
             throw new FalconException(e);
         }
     }
@@ -88,17 +93,19 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
     @Override
     public void onMessage(Message message) {
         MapMessage mapMessage = (MapMessage) message;
-        LOG.info("Received message {}", message.toString());
+        LOG.info("Received JMS message {}", message.toString());
 
         try {
             WorkflowExecutionContext context = createContext(mapMessage);
+            LOG.info("Created context from JMS message {}", context);
+
             if (context.hasWorkflowFailed()) {
                 onFailure(context);
             } else if (context.hasWorkflowSucceeded()) {
                 onSuccess(context);
             }
-        } catch (Exception e) {
-            String errorMessage = "Error in onMessage for subscriber of topic: "
+        } catch (JMSException e) {
+            String errorMessage = "Error in onMessage for topicSubscriber of topic: "
                     + topicName + ", Message: " + message.toString();
             LOG.info(errorMessage, e);
             GenericAlert.alertJMSMessageConsumerFailed(errorMessage, e);
@@ -118,33 +125,57 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
         return WorkflowExecutionContext.create(wfProperties);
     }
 
-    public void onFailure(WorkflowExecutionContext context) throws FalconException {
+    public void onFailure(WorkflowExecutionContext context) {
         jobEndNotificationService.notifyFailure(context);
     }
 
-    public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+    public void onSuccess(WorkflowExecutionContext context) {
         jobEndNotificationService.notifySuccess(context);
     }
 
     @Override
     public void onException(JMSException ignore) {
-        String errorMessage = "Error in onException for subscriber of topic: " + topicName;
+        String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName;
         LOG.info(errorMessage, ignore);
         GenericAlert.alertJMSMessageConsumerFailed(errorMessage, ignore);
     }
 
-    public void closeSubscriber() throws FalconException {
-        try {
-            LOG.info("Closing subscriber on topic : " + this.topicName);
-            if (subscriber != null) {
-                subscriber.close();
+    public void closeSubscriber() {
+        LOG.info("Closing topicSubscriber on topic : " + this.topicName);
+        // closing each quietly so client id can be unsubscribed
+        closeTopicSubscriberQuietly();
+        closeTopicSessionQuietly();
+        closeConnectionQuietly();
+    }
+
+    private void closeTopicSubscriberQuietly() {
+        if (topicSubscriber != null) {
+            try {
+                topicSubscriber.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore);
             }
-            if (connection != null) {
+        }
+    }
+
+    private void closeTopicSessionQuietly() {
+        if (topicSession != null) { // unsubscribe the durable topic topicSubscriber
+            try {
+                topicSession.unsubscribe(FALCON_CLIENT_ID);
+                topicSession.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS topic session: " + topicSession, ignore);
+            }
+        }
+    }
+
+    private void closeConnectionQuietly() {
+        if (connection != null) {
+            try {
                 connection.close();
+            } catch (JMSException ignore) {
+                LOG.error("Error closing JMS connection: " + connection, ignore);
             }
-        } catch (JMSException e) {
-            LOG.error("Error closing subscriber of topic: " + this.toString(), e);
-            throw new FalconException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 9a4a6f7..974116d 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.messaging;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -59,6 +60,7 @@ public class JMSMessageConsumerTest {
         broker.setDataDirectory("target/activemq");
         broker.setBrokerName("localhost");
         broker.start();
+        broker.deleteAllMessages();
     }
 
     public void sendMessages(String topic) throws JMSException, FalconException, IOException {
@@ -137,11 +139,19 @@ public class JMSMessageConsumerTest {
                     BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService());
             subscriber.startSubscriber();
             sendMessages(TOPIC_NAME);
-            Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
+
+            final BrokerView adminView = broker.getAdminView();
+
+            Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+            Assert.assertEquals(adminView.getTotalEnqueueCount(), 11);
+            Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
 
             sendMessages(SECONDARY_TOPIC_NAME);
-            Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
-            Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
+
+            Assert.assertEquals(adminView.getTotalEnqueueCount(), 20);
+            Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+            Assert.assertEquals(adminView.getTotalConsumerCount(), 3);
+
             subscriber.closeSubscriber();
         } catch (Exception e) {
             Assert.fail("This should not have thrown an exception.", e);

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index d2019b5..d8efc37 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -55,7 +55,6 @@ public final class GenericAlert {
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "error-message") String message) {
         return "IGNORE";
-
     }
 
     @Monitored(event = "wf-instance-failed")
@@ -71,8 +70,7 @@ public final class GenericAlert {
             @Dimension(value = "start-time") String startTime,
             @Dimension(value = "error-message") String errorMessage,
             @Dimension(value = "message") String message,
-            @TimeTaken long timeTaken)
-        throws Exception {
+            @TimeTaken long timeTaken) {
 
         return "IGNORE";
     }
@@ -88,8 +86,7 @@ public final class GenericAlert {
             @Dimension(value = "run-id") String runId,
             @Dimension(value = "operation") String operation,
             @Dimension(value = "start-time") String startTime,
-            @TimeTaken long timeTaken)
-        throws Exception {
+            @TimeTaken long timeTaken) {
 
         return "IGNORE";
     }
@@ -100,7 +97,6 @@ public final class GenericAlert {
             @Dimension(value = "message") String message,
             @Dimension(value = "exception") Exception exception) {
         return "IGNORE";
-
     }
 
     @Monitored(event = "log-cleanup-service-failed")
@@ -112,7 +108,7 @@ public final class GenericAlert {
 
     @Monitored(event = "jms-message-consumer-failed")
     public static String alertJMSMessageConsumerFailed(
-            @Dimension(value = "error-message") String errorMessage,
+            @Dimension(value = "message") String message,
             @Dimension(value = "exception") Throwable throwable) {
         return "IGNORE";
     }


[3/4] git commit: FALCON-731 Lineage capture for evicted instance is broken. Contributed by Sowmya Ramesh

Posted by ve...@apache.org.
FALCON-731 Lineage capture for evicted instance is broken. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/00c6f1e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/00c6f1e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/00c6f1e5

Branch: refs/heads/master
Commit: 00c6f1e5d3b3a92f7efbf872d330bb88f1df552c
Parents: a94cb7a
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 15:04:48 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   3 +
 .../org/apache/falcon/entity/ClusterHelper.java |  20 +---
 .../InstanceRelationshipGraphBuilder.java       |  68 ++++++-----
 .../falcon/metadata/MetadataMappingService.java |   1 +
 .../falcon/retention/EvictedInstanceSerDe.java  | 118 +++++++++++++++++++
 .../apache/falcon/retention/EvictionHelper.java |  88 --------------
 .../falcon/workflow/WorkflowExecutionArgs.java  |   2 +-
 .../workflow/WorkflowExecutionContext.java      |   2 +-
 .../metadata/MetadataMappingServiceTest.java    |  90 +++++++-------
 .../falcon/messaging/JMSMessageProducer.java    |  20 +---
 .../apache/falcon/retention/FeedEvictor.java    |   8 +-
 11 files changed, 216 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 523b218..a2bd724 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-731 Lineage capture for evicted instance is broken
+   (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-724 Build fails as Integration test fails (Balu Vellanki via
    Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index cb3ea08..2689cb7 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,13 +18,13 @@
 
 package org.apache.falcon.entity;
 
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.*;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Location;
+import org.apache.falcon.entity.v0.cluster.Property;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import java.util.HashMap;
@@ -40,12 +40,6 @@ public final class ClusterHelper {
     private ClusterHelper() {
     }
 
-    public static FileSystem getFileSystem(String cluster) throws FalconException {
-        Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
-        Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
-        return HadoopClientFactory.get().createProxiedFileSystem(conf);
-    }
-
     public static Configuration getConfiguration(Cluster cluster) {
         Configuration conf = new Configuration();
 
@@ -116,10 +110,6 @@ public final class ClusterHelper {
         return normalizedPath.substring(0, normalizedPath.length() - 1);
     }
 
-    public static String getCompleteLocation(Cluster cluster, String locationKey) {
-        return getStorageUrl(cluster) + "/" + getLocation(cluster, locationKey);
-    }
-
     public static String getLocation(Cluster cluster, String locationKey) {
         for (Location loc : cluster.getLocations().getLocations()) {
             if (loc.getName().equals(locationKey)) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 4d9fbcf..5b5d62c 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Vertex;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
 import org.apache.falcon.entity.FeedHelper;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.common.FeedDataPath;
@@ -34,12 +33,10 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.LocationType;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.URISyntaxException;
 
@@ -51,6 +48,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
 
     private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed
+    private static final String IGNORE = "IGNORE";
 
     // process workflow properties from message
     private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {
@@ -207,39 +205,33 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
-        String outputFeedNamesArg = context.getOutputFeedNames();
-        if ("NONE".equals(outputFeedNamesArg)) {
-            LOG.info("There are no output feeds for this process, return");
+        final String outputFeedPaths = context.getOutputFeedInstancePaths();
+        if (IGNORE.equals(outputFeedPaths)) {
+            LOG.info("There were no evicted instances, nothing to record");
             return;
         }
 
-        String logFile = context.getLogFile();
-        if (StringUtils.isEmpty(logFile)){
-            throw new IllegalArgumentException("csv log file path empty");
-        }
-
+        LOG.info("Recording lineage for evicted instances {}", outputFeedPaths);
+        // For retention there will be only one output feed name
+        String feedName = context.getOutputFeedNames();
+        String[] evictedFeedInstancePathList = context.getOutputFeedInstancePathsList();
         String clusterName = context.getClusterName();
-        String[] paths = EvictionHelper.getInstancePaths(
-                ClusterHelper.getFileSystem(clusterName), new Path(logFile));
-        if (paths == null || paths.length <= 0) {
-            throw new IllegalArgumentException("No instance paths in log file");
-        }
 
-        // For retention there will be only one output feed name
-        String feedName = outputFeedNamesArg;
-        for (String feedInstanceDataPath : paths) {
-            LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                    + feedInstanceDataPath + ", in cluster: " + clusterName);
-            RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+        for (String evictedFeedInstancePath : evictedFeedInstancePathList) {
+            LOG.info("Computing feed instance for : name= {}, path={}, in cluster: {}",
+                    feedName, evictedFeedInstancePath, clusterName);
             String feedInstanceName = getFeedInstanceName(feedName, clusterName,
-                    feedInstanceDataPath, context.getNominalTimeAsISO8601());
-            Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+                    evictedFeedInstancePath, context.getNominalTimeAsISO8601());
+            Vertex feedInstanceVertex = findVertex(feedInstanceName,
+                    RelationshipType.FEED_INSTANCE);
 
             LOG.info("Vertex exists? name={}, type={}, v={}",
-                    feedInstanceName, vertexType, feedInstanceVertex);
-            if (feedInstanceVertex == null) {
-                throw new IllegalStateException(vertexType
-                        + " instance vertex must exist " + feedInstanceName);
+                    feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+            if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+                LOG.info("{} instance vertex {} does not exist, add it",
+                        RelationshipType.FEED_INSTANCE, feedInstanceName);
+                feedInstanceVertex = addFeedInstance(// add a new instance
+                        feedInstanceName, context, feedName, clusterName);
             }
 
             addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
@@ -251,16 +243,20 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
                                  WorkflowExecutionContext context, String feedName,
                                  String feedInstanceDataPath) throws FalconException {
         String clusterName = context.getClusterName();
-        LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                + feedInstanceDataPath + ", in cluster: " + clusterName);
+        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+                feedInstanceDataPath, clusterName);
         String feedInstanceName = getFeedInstanceName(feedName, clusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        LOG.info("Adding feed instance: " + feedInstanceName);
+        Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName);
+        addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
+    }
+
+    private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context,
+                                   String feedName, String clusterName) throws FalconException {
+        LOG.info("Adding feed instance {}", feedInstanceName);
         Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
                 context.getTimeStampAsISO8601());
 
-        addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
-
         addInstanceToEntity(feedInstance, feedName,
                 RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
         addInstanceToEntity(feedInstance, clusterName,
@@ -273,6 +269,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
             addDataClassification(feed.getTags(), feedInstance);
             addGroups(feed.getGroups(), feedInstance);
         }
+
+        return feedInstance;
     }
 
     public static String getFeedInstanceName(String feedName, String clusterName,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index f607e0a..46f8a61 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -271,6 +271,7 @@ public class MetadataMappingService
 
         case DELETE:
             onFeedInstanceEvicted(context);
+            getTransactionalGraph().commit();
             break;
 
         default:

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
new file mode 100644
index 0000000..c2f222b
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.retention;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Utility class for serializing and deserializing the evicted instance paths.
+ */
+
+public final class EvictedInstanceSerDe {
+
+    private static final Logger LOG = LoggerFactory.getLogger(EvictedInstanceSerDe.class);
+
+    private static final String INSTANCEPATH_PREFIX = "instancePaths=";
+    private static final String INSTANCES_SEPARATOR = "=";
+    public static final String INSTANCEPATH_SEPARATOR = ",";
+
+
+    private EvictedInstanceSerDe() {}
+
+    /**
+     * This method serializes the evicted instances to a file in logs dir for a given feed.
+     * @see org.apache.falcon.retention.FeedEvictor
+     *
+     * *Note:* This is executed with in the map task for evictor action
+     *
+     * @param fileSystem file system handle
+     * @param logFilePath       File path to serialize the instances to
+     * @param instances  list of instances, comma separated
+     * @throws IOException
+     */
+    public static void serializeEvictedInstancePaths(final FileSystem fileSystem,
+                                                     final Path logFilePath,
+                                                     StringBuffer instances) throws IOException {
+        LOG.info("Writing deleted instances {} to path {}", instances, logFilePath);
+        OutputStream out = null;
+        try {
+            out = fileSystem.create(logFilePath);
+            instances.insert(0, INSTANCEPATH_PREFIX); // add the prefix
+            out.write(instances.toString().getBytes());
+        } finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+
+        if (LOG.isDebugEnabled()) {
+            logEvictedInstancePaths(fileSystem, logFilePath);
+        }
+    }
+
+    private static void logEvictedInstancePaths(final FileSystem fs,
+                                                final Path outPath) throws IOException {
+        ByteArrayOutputStream writer = new ByteArrayOutputStream();
+        InputStream instance = fs.open(outPath);
+        IOUtils.copyBytes(instance, writer, 4096, true);
+        LOG.debug("Instance Paths copied to {}", outPath);
+        LOG.debug("Written {}", writer);
+    }
+
+    /**
+     * This method deserializes the evicted instances from a log file on hdfs.
+     * @see org.apache.falcon.messaging.JMSMessageProducer
+     * *Note:* This is executed with in the falcon server
+     *
+     * @param fileSystem file system handle
+     * @param logFile    File path to serialize the instances to
+     * @return list of instances, comma separated
+     * @throws IOException
+     */
+    public static String[] deserializeEvictedInstancePaths(final FileSystem fileSystem,
+                                                           final Path logFile) throws IOException {
+        try {
+            ByteArrayOutputStream writer = new ByteArrayOutputStream();
+            InputStream instance = fileSystem.open(logFile);
+            IOUtils.copyBytes(instance, writer, 4096, true);
+            String[] instancePaths = writer.toString().split(INSTANCES_SEPARATOR);
+
+            LOG.info("Deleted feed instance paths file:" + logFile);
+            if (instancePaths.length == 1) {
+                LOG.debug("Returning 0 instance paths for feed ");
+                return new String[0];
+            } else {
+                LOG.debug("Returning instance paths for feed " + instancePaths[1]);
+                return instancePaths[1].split(INSTANCEPATH_SEPARATOR);
+            }
+        } finally {
+            // clean up the serialized state
+            fileSystem.delete(logFile, true);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
deleted file mode 100644
index 5d6481c..0000000
--- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.retention;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Helper methods to facilitate eviction.
- */
-
-public final class EvictionHelper {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
-
-    private static final String INSTANCEPATH_FORMAT = "instancePaths=";
-    public static final String INSTANCEPATH_SEPARATOR = ",";
-
-
-    private EvictionHelper() {}
-
-    public static void logInstancePaths(final FileSystem logfs, final Path path,
-                                        final String data) throws IOException {
-        LOG.info("Writing deleted instances to path {}", path);
-        OutputStream out = logfs.create(path);
-        out.write(INSTANCEPATH_FORMAT.getBytes());
-        out.write(data.getBytes());
-        out.close();
-        debug(logfs, path);
-    }
-
-    public static String[] getInstancePaths(final FileSystem fs,
-                                            final Path logFile) throws FalconException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        try {
-            InputStream date = fs.open(logFile);
-            IOUtils.copyBytes(date, writer, 4096, true);
-        } catch (IOException e) {
-            throw new FalconException(e);
-        }
-        String logData = writer.toString();
-        if (StringUtils.isEmpty(logData)) {
-            throw new FalconException("csv file is empty");
-        }
-
-        String[] parts = logData.split(INSTANCEPATH_FORMAT);
-        if (parts.length != 2) {
-            throw new FalconException("Instance path in csv file not in required format: " + logData);
-        }
-
-        // part[0] is instancePaths=
-        return parts[1].split(INSTANCEPATH_SEPARATOR);
-    }
-
-    private static void debug(final FileSystem fs, final Path outPath) throws IOException {
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(outPath);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        LOG.debug("Instance Paths copied to {}", outPath);
-        LOG.debug("Written {}", writer);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 514bafe..0a8be64 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -75,7 +75,7 @@ public enum WorkflowExecutionArgs {
     BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
 
     // state maintained
-    LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
+    LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false),
     // execution context data recorded
     LOG_DIR("logDir", "log dir where lineage can be recorded"),
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 04ef037..ef55ba9 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -87,7 +87,7 @@ public class WorkflowExecutionContext {
         WorkflowExecutionArgs.RUN_ID,
         WorkflowExecutionArgs.STATUS,
         WorkflowExecutionArgs.TIMESTAMP,
-        WorkflowExecutionArgs.LOG_FILE,
+        WorkflowExecutionArgs.LOG_DIR,
     };
 
     private final Map<WorkflowExecutionArgs, String> context;

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 3b9fdba..895a5f7 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Edge;
 import com.tinkerpop.blueprints.Graph;
 import com.tinkerpop.blueprints.GraphQuery;
 import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
 import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
 import org.apache.falcon.entity.Storage;
 import org.apache.falcon.entity.store.ConfigurationStore;
@@ -41,7 +40,7 @@ import org.apache.falcon.entity.v0.process.Inputs;
 import org.apache.falcon.entity.v0.process.Output;
 import org.apache.falcon.entity.v0.process.Outputs;
 import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
+import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.StartupProperties;
@@ -49,7 +48,6 @@ import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
 import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
@@ -70,7 +68,6 @@ public class MetadataMappingServiceTest {
 
     public static final String FALCON_USER = "falcon-user";
     private static final String LOGS_DIR = "/falcon/staging/feed/logs";
-    private static final String LOG_FILE = "instancePaths-2014-01-01-01-00.csv";
     private static final String NOMINAL_TIME = "2014-01-01-01-00";
 
     public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -93,7 +90,8 @@ public class MetadataMappingServiceTest {
     public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
     public static final String OUTPUT_INSTANCE_PATHS =
         "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
-    private static final String REPLICATED_INSTANCE = "raw-click";
+    private static final String REPLICATED_FEED = "raw-click";
+    private static final String EVICTED_FEED = "imp-click-join1";
     private static final String EVICTED_INSTANCE_PATHS =
             "jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
     public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
@@ -109,10 +107,6 @@ public class MetadataMappingServiceTest {
     private List<Feed> inputFeeds = new ArrayList<Feed>();
     private List<Feed> outputFeeds = new ArrayList<Feed>();
     private Process processEntity;
-    private EmbeddedCluster embeddedCluster;
-    private String hdfsUrl;
-    private static String logFilePath;
-
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -121,6 +115,8 @@ public class MetadataMappingServiceTest {
         configStore = ConfigurationStore.get();
 
         Services.get().register(new WorkflowJobEndNotificationService());
+        StartupProperties.get().setProperty("falcon.graph.storage.directory",
+                "target/graphdb-" + System.currentTimeMillis());
         StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
         service = new MetadataMappingService();
         service.init();
@@ -258,7 +254,8 @@ public class MetadataMappingServiceTest {
         GraphUtils.dump(service.getGraph());
 
         // Verify if instance name has nominal time
-        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(RelationshipType.FEED_INSTANCE.getName());
+        List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(
+                RelationshipType.FEED_INSTANCE.getName());
         List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
                 "imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
         Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
@@ -270,20 +267,20 @@ public class MetadataMappingServiceTest {
     }
 
     @Test
-    public void  testLineageForReplication() throws Exception {
+    public void testLineageForReplication() throws Exception {
         setupForLineageReplication();
 
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_INSTANCE,
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
                         "jail://global:00/falcon/raw-click/bcp/20140101",
-                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_INSTANCE),
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
 
         debug(service.getGraph());
         GraphUtils.dump(service.getGraph());
 
-        verifyLineageGraphForReplicationOrEviction(REPLICATED_INSTANCE,
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
                 "jail://global:00/falcon/raw-click/bcp/20140101", context,
                 RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
 
@@ -303,12 +300,11 @@ public class MetadataMappingServiceTest {
     }
 
     @Test
-    public void   testLineageForRetention() throws Exception {
-        setupForLineageEviciton();
-        String feedName = "imp-click-join1";
+    public void testLineageForRetention() throws Exception {
+        setupForLineageEviction();
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
                         EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
-                        feedName, "IGNORE", "IGNORE", feedName),
+                        EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
 
         service.onSuccess(context);
@@ -322,9 +318,9 @@ public class MetadataMappingServiceTest {
         List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
                 "imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
         verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
-        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+        String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
         for (String feedInstanceDataPath : paths) {
-            verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
+            verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context,
                     RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
         }
 
@@ -336,6 +332,27 @@ public class MetadataMappingServiceTest {
         Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
     }
 
+    @Test
+    public void testLineageForRetentionWithNoFeedsEvicted() throws Exception {
+        cleanUp();
+        service.init();
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+                        EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+        // No new vertices added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount);
+        // No new edges added
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount);
+    }
+
     @Test (dependsOnMethods = "testOnAdd")
     public void testOnChange() throws Exception {
         // shutdown the graph and resurrect for testing
@@ -673,7 +690,9 @@ public class MetadataMappingServiceTest {
         // feeds owned by a user
         List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName());
         Assert.assertEquals(feedNamesOwnedByUser,
-                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1", "imp-click-join2"));
+                Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1",
+                        "imp-click-join2")
+        );
 
         // feeds classified as secure
         verifyFeedsClassifiedAsSecure(feedType.getName(),
@@ -736,7 +755,8 @@ public class MetadataMappingServiceTest {
                 }
             }
         }
-        Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
+        Assert.assertTrue(actual.containsAll(expected),
+                "Actual does not contain expected: " + actual);
     }
 
     public long getVerticesCount(final Graph graph) {
@@ -855,8 +875,6 @@ public class MetadataMappingServiceTest {
             "-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
 
             "-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
-            "-" + WorkflowExecutionArgs.LOG_FILE.getName(),
-            (logFilePath != null ? logFilePath : LOGS_DIR + "/log" + ".txt"),
         };
     }
 
@@ -907,7 +925,7 @@ public class MetadataMappingServiceTest {
         Cluster[] clusters = {clusterEntity, bcpCluster};
 
         // Add feed
-        Feed rawFeed = addFeedEntity(REPLICATED_INSTANCE, clusters,
+        Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters,
                 "classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
                 "/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
         // Add uri template for each cluster
@@ -945,22 +963,12 @@ public class MetadataMappingServiceTest {
                 EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
                 "jail://global:00/falcon/imp-click-join1/20140101",
                 "jail://global:00/falcon/raw-click/primary/20140101",
-                REPLICATED_INSTANCE), WorkflowExecutionContext.Type.POST_PROCESSING);
+                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
     }
 
-    private void setupForLineageEviciton() throws Exception {
-        cleanUp();
-        service.init();
-
-        // Add cluster
-        embeddedCluster = EmbeddedCluster.newCluster(CLUSTER_ENTITY_NAME, true, COLO_NAME,
-                "classification=production");
-        clusterEntity = embeddedCluster.getCluster();
-        configStore.publish(EntityType.CLUSTER, clusterEntity);
-        hdfsUrl = embeddedCluster.getConf().get("fs.default.name");
-
-        addFeedsAndProcess(clusterEntity);
+    private void setupForLineageEviction() throws Exception {
+        setup();
 
         // GENERATE WF should have run before this to create all instance related vertices
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -968,12 +976,6 @@ public class MetadataMappingServiceTest {
                         "imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
                 WorkflowExecutionContext.Type.POST_PROCESSING);
         service.onSuccess(context);
-
-        // Write to csv file
-        String csvData = EVICTED_INSTANCE_PATHS;
-        logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
-        Path path = new Path(logFilePath);
-        EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
     }
 
     private void setupForNoDateInFeedPath() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 0181e74..a60e951 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,12 +18,12 @@
 
 package org.apache.falcon.messaging;
 
+import org.apache.falcon.retention.EvictedInstanceSerDe;
 import org.apache.falcon.workflow.WorkflowExecutionArgs;
 import org.apache.falcon.workflow.WorkflowExecutionContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,9 +35,7 @@ import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.Session;
 import javax.jms.Topic;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -290,19 +288,7 @@ public class JMSMessageProducer {
             return new String[0];
         }
 
-        ByteArrayOutputStream writer = new ByteArrayOutputStream();
-        InputStream instance = fs.open(logFile);
-        IOUtils.copyBytes(instance, writer, 4096, true);
-        String[] instancePaths = writer.toString().split("=");
-        fs.delete(logFile, true);
-        LOG.info("Deleted feed instance paths file:" + logFile);
-        if (instancePaths.length == 1) {
-            LOG.debug("Returning 0 instance paths for feed ");
-            return new String[0];
-        } else {
-            LOG.debug("Returning instance paths for feed " + instancePaths[1]);
-            return instancePaths[1].split(",");
-        }
+        return EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, logFile);
     }
 
     private Map<String, String> buildMessage(final WorkflowExecutionArgs[] filter) {
@@ -311,6 +297,8 @@ public class JMSMessageProducer {
             message.put(arg.getName(), context.getValue(arg));
         }
 
+        // this is NOT useful since the file is deleted after message is sent
+        message.remove(WorkflowExecutionArgs.LOG_FILE.getName());
         return message;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 114071f..9589edf 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -126,7 +126,8 @@ public class FeedEvictor extends Configured implements Tool {
         evict(storage, retentionLimit, timeZone);
 
         Path path = new Path(logFile);
-        EvictionHelper.logInstancePaths(path.getFileSystem(getConf()), path, instancePaths.toString());
+        EvictedInstanceSerDe.serializeEvictedInstancePaths(
+                path.getFileSystem(getConf()), path, instancePaths);
 
         int len = buffer.length();
         if (len > 0) {
@@ -180,7 +181,7 @@ public class FeedEvictor extends Configured implements Tool {
             deleteInstance(fs, path, feedBasePath);
             Date date = getDate(path, feedPath, dateMask, timeZone);
             buffer.append(dateFormat.format(date)).append(',');
-            instancePaths.append(path).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+            instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
         }
     }
 
@@ -532,7 +533,7 @@ public class FeedEvictor extends Configured implements Tool {
                 String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
                 LOG.info("Deleted partition: " + partitionInfo);
                 buffer.append(partSpec).append(',');
-                instancePaths.append(partitionInfo).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+                instancePaths.append(partitionInfo).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
             }
         }
     }
@@ -553,5 +554,4 @@ public class FeedEvictor extends Configured implements Tool {
             }
         }
     }
-
 }


[4/4] git commit: FALCON-732 Lineage capture fails for an instance thats not generated by falcon. Contributed by Sowmya Ramesh

Posted by ve...@apache.org.
FALCON-732 Lineage capture fails for an instance thats not generated by falcon. Contributed by Sowmya Ramesh


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/33b420b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/33b420b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/33b420b5

Branch: refs/heads/master
Commit: 33b420b5d04cd08c936ddf9d6e8081eab32015c4
Parents: 00c6f1e
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 16:48:30 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 +
 .../InstanceRelationshipGraphBuilder.java       | 35 ++++-----
 .../falcon/metadata/MetadataMappingService.java |  1 +
 .../workflow/WorkflowExecutionContext.java      | 22 +++++-
 .../metadata/MetadataMappingServiceTest.java    | 82 ++++++++++++++------
 .../feed/FeedReplicationCoordinatorBuilder.java |  4 +
 .../feed/OozieFeedWorkflowBuilderTest.java      |  6 +-
 .../falcon/oozie/process/AbstractTestBase.java  | 15 +++-
 8 files changed, 119 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2bd724..0558ab4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
   OPTIMIZATIONS
 
   BUG FIXES
+   FALCON-732 Lineage capture fails for an instance thats not generated by
+   falcon (Sowmya Ramesh via Venkatesh Seetharam)
+
    FALCON-731 Lineage capture for evicted instance is broken
    (Sowmya Ramesh via Venkatesh Seetharam)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 5b5d62c..764e732 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -68,7 +68,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
 
     public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException {
         String processInstanceName = getProcessInstanceName(context);
-        LOG.info("Adding process instance: " + processInstanceName);
+        LOG.info("Adding process instance: {}", processInstanceName);
 
         Vertex processInstance = addVertex(processInstanceName,
                 RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsISO8601());
@@ -175,29 +175,24 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
     }
 
     public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException {
-        String outputFeedNamesArg = context.getOutputFeedNames();
-        if ("NONE".equals(outputFeedNamesArg)) {
-            return; // there are no output feeds
-        }
-
-        String[] outputFeedNames = context.getOutputFeedNamesList();
-        String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
+        // For replication there will be only one output feed name and path
+        String feedName = context.getOutputFeedNames();
+        String feedInstanceDataPath = context.getOutputFeedInstancePaths();
         String targetClusterName = context.getClusterName();
 
-        // For replication there will be only one output feed name
-        String feedName = outputFeedNames[0];
-        String feedInstanceDataPath = outputFeedInstancePaths[0];
-
-        LOG.info("Computing feed instance for : name=" + feedName + ", path= "
-                + feedInstanceDataPath + ", in cluster: " + targetClusterName);
-        RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+        LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+                feedInstanceDataPath, targetClusterName);
         String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
                 feedInstanceDataPath, context.getNominalTimeAsISO8601());
-        Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
-
-        LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
-        if (feedInstanceVertex == null) {
-            throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+        Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+        LOG.info("Vertex exists? name={}, type={}, v={}",
+                feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+        if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+            LOG.info("{} instance vertex {} does not exist, add it",
+                    RelationshipType.FEED_INSTANCE, feedInstanceName);
+            feedInstanceVertex = addFeedInstance( // add a new instance
+                    feedInstanceName, context, feedName, context.getSrcClusterName());
         }
 
         addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 46f8a61..0a77bf1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -267,6 +267,7 @@ public class MetadataMappingService
 
         case REPLICATE:
             onFeedInstanceReplicated(context);
+            getTransactionalGraph().commit();
             break;
 
         case DELETE:

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index ef55ba9..4c94c27 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,6 +54,7 @@ public class WorkflowExecutionContext {
 
     public static final String OUTPUT_FEED_SEPARATOR = ",";
     public static final String INPUT_FEED_SEPARATOR = "#";
+    public static final String CLUSTER_NAME_SEPARATOR = ",";
 
     /**
      * Workflow execution status.
@@ -160,7 +161,26 @@ public class WorkflowExecutionContext {
     }
 
     public String getClusterName() {
-        return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        if (EntityOperations.REPLICATE != getOperation()) {
+            return value;
+        }
+
+        return value.split(CLUSTER_NAME_SEPARATOR)[0];
+    }
+
+    public String getSrcClusterName() {
+        String value =  getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+        if (EntityOperations.REPLICATE != getOperation()) {
+            return value;
+        }
+
+        String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
+        if (parts.length != 2) {
+            throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
+        }
+
+        return parts[1];
     }
 
     public String getEntityName() {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 895a5f7..11d27fe 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -169,7 +169,7 @@ public class MetadataMappingServiceTest {
                 "/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
         inputFeeds.add(impressionsFeed);
         verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
-        verifyFeedEntityEdges(impressionsFeed.getName());
+        verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics");
         Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
         Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
 
@@ -300,6 +300,37 @@ public class MetadataMappingServiceTest {
     }
 
     @Test
+    public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
+        cleanUp();
+        service.init();
+
+        addClusterAndFeedForReplication();
+        // Get the vertices before running replication WF
+        long beforeVerticesCount = getVerticesCount(service.getGraph());
+        long beforeEdgesCount = getEdgesCount(service.getGraph());
+
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                        EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
+                        "jail://global:00/falcon/raw-click/bcp/20140101",
+                        "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
+                WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+
+        debug(service.getGraph());
+        GraphUtils.dump(service.getGraph());
+
+        verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics");
+        verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
+                "jail://global:00/falcon/raw-click/bcp/20140101", context,
+                RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+        // +1 for the new instance vertex added
+        Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1);
+        // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to
+        Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
+    }
+
+    @Test
     public void testLineageForRetention() throws Exception {
         setupForLineageEviction();
         WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -605,7 +636,7 @@ public class MetadataMappingServiceTest {
                 "production", RelationshipType.TAGS.getName());
     }
 
-    private void verifyFeedEntityEdges(String feedName) {
+    private void verifyFeedEntityEdges(String feedName, String tag, String group) {
         Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY);
 
         // verify edge to cluster vertex
@@ -614,12 +645,13 @@ public class MetadataMappingServiceTest {
         // verify edge to user vertex
         verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(),
                 FALCON_USER, RelationshipType.USER.getName());
+
         // verify edge to tags vertex
         verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as",
-                "Secure", RelationshipType.TAGS.getName());
+                tag, RelationshipType.TAGS.getName());
         // verify edge to group vertex
         verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(),
-                "analytics", RelationshipType.GROUPS.getName());
+                group, RelationshipType.GROUPS.getName());
     }
 
     private void verifyProcessEntityEdges() {
@@ -834,7 +866,7 @@ public class MetadataMappingServiceTest {
                                                String falconInputFeeds) {
         String cluster;
         if (EntityOperations.REPLICATE == operation) {
-            cluster = BCP_CLUSTER_ENTITY_NAME;
+            cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
         } else {
             cluster = CLUSTER_ENTITY_NAME;
         }
@@ -916,6 +948,28 @@ public class MetadataMappingServiceTest {
         cleanUp();
         service.init();
 
+        addClusterAndFeedForReplication();
+
+        // Add output feed
+        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+        outputFeeds.add(join1Feed);
+
+        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+                WORKFLOW_VERSION);
+
+        // GENERATE WF should have run before this to create all instance related vertices
+        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+                "jail://global:00/falcon/imp-click-join1/20140101",
+                "jail://global:00/falcon/raw-click/primary/20140101",
+                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
+        service.onSuccess(context);
+    }
+
+    private void addClusterAndFeedForReplication() throws Exception {
         // Add cluster
         clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
                 "classification=production");
@@ -947,24 +1001,6 @@ public class MetadataMappingServiceTest {
             configStore.cleanupUpdateInit();
         }
         inputFeeds.add(rawFeed);
-
-        // Add output feed
-        Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
-                "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
-                "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
-        outputFeeds.add(join1Feed);
-
-        processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
-                "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
-                WORKFLOW_VERSION);
-
-        // GENERATE WF should have run before this to create all instance related vertices
-        WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
-                EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
-                "jail://global:00/falcon/imp-click-join1/20140101",
-                "jail://global:00/falcon/raw-click/primary/20140101",
-                REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
-        service.onSuccess(context);
     }
 
     private void setupForLineageEviction() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 966f90e..801d733 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,6 +159,10 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
 
         workflow.setAppPath(getStoragePath(buildPath));
         Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+        // Override CLUSTER_NAME property to include both source and target cluster pair
+        String clusterProperty = trgCluster.getName()
+                + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
+        props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
         props.put("srcClusterName", srcCluster.getName());
         props.put("srcClusterColo", srcCluster.getColo());
         if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 3c49353..379cf34 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
 
         HashMap<String, String> props = getCoordProperties(coord);
 
-        verifyEntityProperties(feed, trgCluster,
+        verifyEntityProperties(feed, trgCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
 
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         Assert.assertEquals(props.get("maxMaps"), "33");
         Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
 
-        verifyEntityProperties(aFeed, aCluster,
+        verifyEntityProperties(aFeed, aCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
         assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
                 wfPath.toString());
 
-        verifyEntityProperties(tableFeed, trgCluster,
+        verifyEntityProperties(tableFeed, trgCluster, srcCluster,
                 WorkflowExecutionContext.EntityOperations.REPLICATE, props);
         verifyBrokerProperties(trgCluster, props);
     }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index b547c31..b549cfb 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -214,18 +214,29 @@ public class AbstractTestBase {
         return props;
     }
 
-    protected void verifyEntityProperties(Entity entity, Cluster cluster,
+    protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
                                           WorkflowExecutionContext.EntityOperations operation,
                                           HashMap<String, String> props) throws Exception {
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
                 entity.getName());
         Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
                 entity.getEntityType().name());
-        Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+        if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
+            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
+                    cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
+        } else {
+            Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+        }
         Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
         Assert.assertEquals(props.get("falconDataOperation"), operation.name());
     }
 
+    protected void verifyEntityProperties(Entity entity, Cluster cluster,
+                                          WorkflowExecutionContext.EntityOperations operation,
+                                          HashMap<String, String> props) throws Exception {
+        verifyEntityProperties(entity, cluster, null, operation, props);
+    }
+
     private String getLogPath(Cluster cluster, Entity entity) {
         Path logPath = EntityUtil.getLogPath(cluster, entity);
         return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;