You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/09/18 01:56:55 UTC
[1/4] git commit: FALCON-666 Add Alerts for unrecoverable failures.
Contributed by Venkatesh Seetharam
Repository: incubator-falcon
Updated Branches:
refs/heads/master df6f1d8eb -> 33b420b5d
FALCON-666 Add Alerts for unrecoverable failures. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/a94cb7a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/a94cb7a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/a94cb7a8
Branch: refs/heads/master
Commit: a94cb7a8b7c14f1819785f0ec5e51d8e2478d610
Parents: caa2284
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 14:37:36 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:56 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
common/src/main/resources/log4j.xml | 9 +++
.../falcon/aspect/AbstractFalconAspect.java | 50 ++++++++++------
.../org/apache/falcon/aspect/AlertMessage.java | 56 ++++++++++++++++++
.../org/apache/falcon/aspect/GenericAlert.java | 7 +--
.../java/org/apache/falcon/monitors/Alert.java | 37 ++++++++++++
.../apache/falcon/plugin/AlertingPlugin.java | 30 ++++++++++
.../org/apache/falcon/plugin/LoggingPlugin.java | 9 ++-
.../falcon/util/ResourcesReflectionUtil.java | 31 +++++-----
.../apache/falcon/aspect/AlertMessageTest.java | 53 +++++++++++++++++
.../plugin/ChainableMonitoringPlugin.java | 51 +++++++++++++++--
.../plugin/ChainableMonitoringPluginTest.java | 60 ++++++++++++++++++++
src/conf/log4j.xml | 9 +++
webapp/src/main/resources/log4j.xml | 9 +++
14 files changed, 369 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 824bddd..523b218 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,8 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-666 Add Alerts for unrecoverable failures (Venkatesh Seetharam)
+
FALCON-665 Handle message consumption failures in JMSMessageConsumer
(Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/common/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml
index dd1ef70..75c8267 100644
--- a/common/src/main/resources/log4j.xml
+++ b/common/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
</layout>
</appender>
+ <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m%n"/>
+ </layout>
+ </appender>
+
<logger name="org.apache.falcon" additivity="false">
<level value="debug"/>
<appender-ref ref="FILE"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java b/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
index 29c77ce..06bda3f 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/AbstractFalconAspect.java
@@ -38,11 +38,11 @@ public abstract class AbstractFalconAspect {
private static final Logger LOG = LoggerFactory.getLogger(AbstractFalconAspect.class);
@Around("@annotation(org.apache.falcon.monitors.Monitored)")
- public Object logAround(ProceedingJoinPoint joinPoint) throws Throwable {
+ public Object logAroundMonitored(ProceedingJoinPoint joinPoint) throws Throwable {
String methodName = joinPoint.getSignature().getName();
Object[] args = joinPoint.getArgs();
- Object result = null;
+ Object result;
ResourceMessage.Status status;
long startTime = System.nanoTime();
@@ -52,8 +52,8 @@ public abstract class AbstractFalconAspect {
} catch (Exception e) {
endTime = System.nanoTime();
status = ResourceMessage.Status.FAILED;
- publishMessage(getResourceMessage(joinPoint.getSignature()
- .getDeclaringType().getSimpleName()
+ publishMessage(getResourceMessage(
+ joinPoint.getSignature().getDeclaringType().getSimpleName()
+ "." + methodName, args, status, endTime - startTime));
throw e;
}
@@ -65,13 +65,11 @@ public abstract class AbstractFalconAspect {
return result;
}
- private ResourceMessage getResourceMessage(String methodName,
- Object[] args, ResourceMessage.Status status, long executionTime) {
- String action = ResourcesReflectionUtil
- .getResourceMonitorName(methodName);
+ private ResourceMessage getResourceMessage(String methodName, Object[] args,
+ ResourceMessage.Status status, long executionTime) {
+ String action = ResourcesReflectionUtil.getResourceMonitorName(methodName);
- assert action != null : "Method :" + methodName
- + " not parsed by reflection util";
+ assert action != null : "Method :" + methodName + " not parsed by reflection util";
Map<String, String> dimensions = new HashMap<String, String>();
if (ResourcesReflectionUtil.getResourceDimensionsName(methodName) == null) {
@@ -79,17 +77,37 @@ public abstract class AbstractFalconAspect {
} else {
for (Map.Entry<Integer, String> param : ResourcesReflectionUtil
.getResourceDimensionsName(methodName).entrySet()) {
- dimensions.put(
- param.getValue(),
- args[param.getKey()] == null ? "NULL" : args[param
- .getKey()].toString());
+ dimensions.put(param.getValue(),
+ args[param.getKey()] == null ? "NULL" : args[param.getKey()].toString());
}
}
- Integer timeTakenArg = ResourcesReflectionUtil
- .getResourceTimeTakenName(methodName);
+
+ Integer timeTakenArg = ResourcesReflectionUtil.getResourceTimeTakenName(methodName);
return timeTakenArg == null ? new ResourceMessage(action, dimensions, status, executionTime)
: new ResourceMessage(action, dimensions, status, Long.valueOf(args[timeTakenArg].toString()));
}
public abstract void publishMessage(ResourceMessage message);
+
+ @Around("@annotation(org.apache.falcon.monitors.Alert)")
+ public Object logAroundAlert(ProceedingJoinPoint joinPoint) throws Throwable {
+
+ String methodName = joinPoint.getSignature().getName();
+ String event = ResourcesReflectionUtil.getResourceMonitorName(
+ joinPoint.getSignature().getDeclaringType().getSimpleName() + "." + methodName);
+ Object[] args = joinPoint.getArgs();
+ Object result;
+
+ try {
+ result = joinPoint.proceed();
+ } finally {
+ AlertMessage alertMessage = new AlertMessage(
+ event, args[0].toString(), args[1].toString());
+ publishAlert(alertMessage);
+ }
+
+ return result;
+ }
+
+ public abstract void publishAlert(AlertMessage alertMessage);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java b/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
new file mode 100644
index 0000000..0f38e34
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/aspect/AlertMessage.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.aspect;
+
+/**
+ * Message to be sent to the alerting system.
+ */
+public class AlertMessage {
+
+ private final String event;
+ private final String alert;
+ private final String error;
+
+ public AlertMessage(String event, String alert, String error) {
+ this.event = event;
+ this.alert = alert;
+ this.error = error;
+ }
+
+ public String getEvent() {
+ return event;
+ }
+
+ public String getAlert() {
+ return alert;
+ }
+
+ public String getError() {
+ return error;
+ }
+
+ @Override
+ public String toString() {
+ return "AlertMessage{"
+ + "event='" + event + '\''
+ + ", alert='" + alert + '\''
+ + ", error='" + error + '\''
+ + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index d8efc37..c7a86d9 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -17,6 +17,7 @@
*/
package org.apache.falcon.aspect;
+import org.apache.falcon.monitors.Alert;
import org.apache.falcon.monitors.Dimension;
import org.apache.falcon.monitors.Monitored;
import org.apache.falcon.monitors.TimeTaken;
@@ -71,7 +72,6 @@ public final class GenericAlert {
@Dimension(value = "error-message") String errorMessage,
@Dimension(value = "message") String message,
@TimeTaken long timeTaken) {
-
return "IGNORE";
}
@@ -87,7 +87,6 @@ public final class GenericAlert {
@Dimension(value = "operation") String operation,
@Dimension(value = "start-time") String startTime,
@TimeTaken long timeTaken) {
-
return "IGNORE";
}
//RESUME CHECKSTYLE CHECK ParameterNumberCheck
@@ -99,14 +98,14 @@ public final class GenericAlert {
return "IGNORE";
}
- @Monitored(event = "log-cleanup-service-failed")
+ @Alert(event = "log-cleanup-service-failed")
public static String alertLogCleanupServiceFailed(
@Dimension(value = "message") String message,
@Dimension(value = "exception") Throwable throwable) {
return "IGNORE";
}
- @Monitored(event = "jms-message-consumer-failed")
+ @Alert(event = "jms-message-consumer-failed")
public static String alertJMSMessageConsumerFailed(
@Dimension(value = "message") String message,
@Dimension(value = "exception") Throwable throwable) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/monitors/Alert.java b/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
new file mode 100644
index 0000000..03bd387
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/monitors/Alert.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.monitors;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Alert annotation for monitoring.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface Alert {
+
+ /**
+ * @return Event name associated with this alert
+ */
+ String event();
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java b/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
new file mode 100644
index 0000000..8539ea3
--- /dev/null
+++ b/metrics/src/main/java/org/apache/falcon/plugin/AlertingPlugin.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.plugin;
+
+import org.apache.falcon.aspect.AlertMessage;
+
+/**
+ * Generic interface to receiving alerts.
+ */
+public interface AlertingPlugin {
+
+ void alert(AlertMessage message);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java b/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
index 77d0d30..c959fc2 100644
--- a/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
+++ b/metrics/src/main/java/org/apache/falcon/plugin/LoggingPlugin.java
@@ -18,6 +18,7 @@
package org.apache.falcon.plugin;
+import org.apache.falcon.aspect.AlertMessage;
import org.apache.falcon.aspect.ResourceMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,11 +26,17 @@ import org.slf4j.LoggerFactory;
/**
* Plugin for logging metrics using log4j.
*/
-public class LoggingPlugin implements MonitoringPlugin {
+public class LoggingPlugin implements MonitoringPlugin, AlertingPlugin {
private static final Logger METRIC = LoggerFactory.getLogger("METRIC");
+ private static final Logger ALERT = LoggerFactory.getLogger("ALERT");
@Override
public void monitor(ResourceMessage message) {
METRIC.info("{}", message);
}
+
+ @Override
+ public void alert(AlertMessage message) {
+ ALERT.info("{}", message);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
index 6e80db8..3bafaee 100644
--- a/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
+++ b/metrics/src/main/java/org/apache/falcon/util/ResourcesReflectionUtil.java
@@ -18,6 +18,7 @@
package org.apache.falcon.util;
+import org.apache.falcon.monitors.Alert;
import org.apache.falcon.monitors.Monitored;
import org.apache.falcon.monitors.TimeTaken;
@@ -48,7 +49,8 @@ public final class ResourcesReflectionUtil {
}
public static Map<Integer, String> getResourceDimensionsName(String methodName) {
- return METHODS.get(methodName) != null ? Collections.unmodifiableMap(METHODS.get(methodName).params) : null;
+ return METHODS.get(methodName) != null
+ ? Collections.unmodifiableMap(METHODS.get(methodName).params) : null;
}
public static String getResourceMonitorName(String methodName) {
@@ -56,8 +58,7 @@ public final class ResourcesReflectionUtil {
}
public static Integer getResourceTimeTakenName(String methodName) {
- return METHODS.get(methodName) != null ? METHODS.get(methodName).timeTakenArgIndex
- : null;
+ return METHODS.get(methodName) != null ? METHODS.get(methodName).timeTakenArgIndex : null;
}
/**
@@ -81,11 +82,11 @@ public final class ResourcesReflectionUtil {
private static void buildAnnotationsMapForClass(String className) {
Class clazz;
try {
- clazz = ResourcesReflectionUtil.class.
- getClassLoader().loadClass(className);
+ clazz = ResourcesReflectionUtil.class.getClassLoader().loadClass(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unable to load class " + className, e);
}
+
Method[] declMethods = clazz.getMethods();
// scan every method
@@ -93,20 +94,17 @@ public final class ResourcesReflectionUtil {
Annotation[] methodAnnots = declMethod.getDeclaredAnnotations();
// scan every annotation on method
for (Annotation methodAnnot : methodAnnots) {
- if (methodAnnot.annotationType().getSimpleName()
- .equals(Monitored.class.getSimpleName())) {
+ final String simpleName = methodAnnot.annotationType().getSimpleName();
+ if (simpleName.equals(Monitored.class.getSimpleName())
+ || simpleName.equals(Alert.class.getSimpleName())) {
MethodAnnotation annotation = new MethodAnnotation();
- annotation.monitoredName = getAnnotationValue(
- methodAnnot, "event");
- Annotation[][] paramAnnots = declMethod
- .getParameterAnnotations();
+ annotation.monitoredName = getAnnotationValue(methodAnnot, "event");
+ Annotation[][] paramAnnots = declMethod.getParameterAnnotations();
+
// scan every param
annotation.params = getDeclaredParamAnnots(paramAnnots, annotation);
- METHODS.put(
- clazz.getSimpleName() + "."
- + declMethod.getName(), annotation);
+ METHODS.put(clazz.getSimpleName() + "." + declMethod.getName(), annotation);
}
-
}
}
}
@@ -126,8 +124,8 @@ public final class ResourcesReflectionUtil {
}
}
}
- return params;
+ return params;
}
private static String getAnnotationValue(Annotation annotation,
@@ -145,5 +143,4 @@ public final class ResourcesReflectionUtil {
return value;
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
----------------------------------------------------------------------
diff --git a/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java b/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
new file mode 100644
index 0000000..d53a234
--- /dev/null
+++ b/metrics/src/test/java/org/apache/falcon/aspect/AlertMessageTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.aspect;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Test Message to be sent to the alerting system.
+ */
+public class AlertMessageTest {
+
+ private final AlertMessage alertMessage = new AlertMessage(
+ "event", "alert", "error"
+ );
+
+ @Test
+ public void testGetEvent() throws Exception {
+ Assert.assertEquals(alertMessage.getEvent(), "event");
+ }
+
+ @Test
+ public void testGetAlert() throws Exception {
+ Assert.assertEquals(alertMessage.getAlert(), "alert");
+ }
+
+ @Test
+ public void testGetError() throws Exception {
+ Assert.assertEquals(alertMessage.getError(), "error");
+ }
+
+ @Test
+ public void testToString() throws Exception {
+ Assert.assertEquals(alertMessage.toString(),
+ "AlertMessage{event='event', alert='alert', error='error'}");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
index c695bb7..1a5a331 100644
--- a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
+++ b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
@@ -20,6 +20,7 @@ package org.apache.falcon.plugin;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.AbstractFalconAspect;
+import org.apache.falcon.aspect.AlertMessage;
import org.apache.falcon.aspect.ResourceMessage;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
@@ -36,29 +37,51 @@ import java.util.List;
* of {@link MonitoringPlugin}. {@link LoggingPlugin} is the default.
*/
@Aspect
-public class ChainableMonitoringPlugin extends AbstractFalconAspect implements MonitoringPlugin {
+public class ChainableMonitoringPlugin extends AbstractFalconAspect
+ implements MonitoringPlugin, AlertingPlugin {
private static final Logger LOG = LoggerFactory.getLogger(ChainableMonitoringPlugin.class);
- private List<MonitoringPlugin> plugins = new ArrayList<MonitoringPlugin>();
+ private List<MonitoringPlugin> monitoringPlugins = new ArrayList<MonitoringPlugin>();
+ private List<AlertingPlugin> alertingPlugins = new ArrayList<AlertingPlugin>();
public ChainableMonitoringPlugin() {
+ initializeMonitoringPlugins();
+ initializeAlertingPlugins();
+ }
+
+ private void initializeMonitoringPlugins() {
String pluginClasses = StartupProperties.get().
getProperty("monitoring.plugins", LoggingPlugin.class.getName());
try {
for (String pluginClass : pluginClasses.split(",")) {
MonitoringPlugin plugin = ReflectionUtils.getInstanceByClassName(pluginClass.trim());
- plugins.add(plugin);
+ monitoringPlugins.add(plugin);
LOG.info("Registered Monitoring Plugin {}", pluginClass);
}
} catch (FalconException e) {
- plugins = Arrays.asList((MonitoringPlugin) new LoggingPlugin());
- LOG.error("Unable to initialize monitoring plugins: {}", pluginClasses, e);
+ monitoringPlugins = Arrays.asList((MonitoringPlugin) new LoggingPlugin());
+ LOG.error("Unable to initialize monitoring.plugins: {}", pluginClasses, e);
+ }
+ }
+
+ private void initializeAlertingPlugins() {
+ String pluginClasses = StartupProperties.get().
+ getProperty("alerting.plugins", LoggingPlugin.class.getName());
+ try {
+ for (String pluginClass : pluginClasses.split(",")) {
+ AlertingPlugin plugin = ReflectionUtils.getInstanceByClassName(pluginClass.trim());
+ alertingPlugins.add(plugin);
+ LOG.info("Registered Alerting Plugin {}", pluginClass);
+ }
+ } catch (FalconException e) {
+ alertingPlugins = Arrays.asList((AlertingPlugin) new LoggingPlugin());
+ LOG.error("Unable to initialize alerting.plugins: {}", pluginClasses, e);
}
}
@Override
public void monitor(ResourceMessage message) {
- for (MonitoringPlugin plugin : plugins) {
+ for (MonitoringPlugin plugin : monitoringPlugins) {
try {
plugin.monitor(message);
} catch (Exception e) {
@@ -71,4 +94,20 @@ public class ChainableMonitoringPlugin extends AbstractFalconAspect implements M
public void publishMessage(ResourceMessage message) {
monitor(message);
}
+
+ @Override
+ public void publishAlert(AlertMessage alertMessage) {
+ alert(alertMessage);
+ }
+
+ @Override
+ public void alert(AlertMessage alertMessage) {
+ for (AlertingPlugin plugin : alertingPlugins) {
+ try {
+ plugin.alert(alertMessage);
+ } catch (Exception e) {
+ LOG.debug("Unable to publish message to {}", plugin.getClass(), e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java b/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
new file mode 100644
index 0000000..9a386c6
--- /dev/null
+++ b/prism/src/test/java/org/apache/falcon/plugin/ChainableMonitoringPluginTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.plugin;
+
+import org.apache.falcon.aspect.AlertMessage;
+import org.apache.falcon.aspect.GenericAlert;
+import org.apache.falcon.aspect.ResourceMessage;
+import org.apache.falcon.util.StartupProperties;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * Test for ChainableMonitoringPlugin.
+ */
+public class ChainableMonitoringPluginTest implements MonitoringPlugin, AlertingPlugin {
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ StartupProperties.get().
+ setProperty("monitoring.plugins", this.getClass().getName());
+ StartupProperties.get().
+ setProperty("alerting.plugins", this.getClass().getName());
+ }
+
+ @Test
+ public void testPlugin() throws Exception {
+ GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df",
+ "ef-id", "wf-user", "1", "DELETE", "now", "error", "none", 1242);
+ GenericAlert.alertJMSMessageConsumerFailed("test-alert", new Exception("test"));
+ }
+
+ @Override
+ public void monitor(ResourceMessage message) {
+ Assert.assertNotNull(message);
+ Assert.assertEquals(message.getAction(), "wf-instance-failed");
+ }
+
+ @Override
+ public void alert(AlertMessage alertMessage) {
+ Assert.assertNotNull(alertMessage);
+ Assert.assertEquals(alertMessage.getEvent(), "jms-message-consumer-failed");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/src/conf/log4j.xml
----------------------------------------------------------------------
diff --git a/src/conf/log4j.xml b/src/conf/log4j.xml
index 90abe26..1341c6e 100644
--- a/src/conf/log4j.xml
+++ b/src/conf/log4j.xml
@@ -51,6 +51,15 @@
</layout>
</appender>
+ <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m%n"/>
+ </layout>
+ </appender>
+
<appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${falcon.log.dir}/${falcon.app.type}.security.audit.log"/>
<param name="Append" value="true"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a94cb7a8/webapp/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/webapp/src/main/resources/log4j.xml b/webapp/src/main/resources/log4j.xml
index 5ba6f16..3f9a191 100644
--- a/webapp/src/main/resources/log4j.xml
+++ b/webapp/src/main/resources/log4j.xml
@@ -54,6 +54,15 @@
</layout>
</appender>
+ <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m%n"/>
+ </layout>
+ </appender>
+
<appender name="SECURITY" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="${user.dir}/target/logs/security.audit.log"/>
<param name="Append" value="true"/>
[2/4] git commit: FALCON-665 Handle message consumption failures in
JMSMessageConsumer. Contributed by Venkatesh Seetharam
Posted by ve...@apache.org.
FALCON-665 Handle message consumption failures in JMSMessageConsumer. Contributed by Venkatesh Seetharam
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/caa22842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/caa22842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/caa22842
Branch: refs/heads/master
Commit: caa22842913419e1a3acab292ca7c5396d639d60
Parents: df6f1d8
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 14:35:22 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:56 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../workflow/WorkflowExecutionContext.java | 5 ++
.../WorkflowJobEndNotificationService.java | 25 +++----
.../WorkflowJobEndNotificationServiceTest.java | 12 +---
.../falcon/messaging/JMSMessageConsumer.java | 75 ++++++++++++++------
.../messaging/JMSMessageConsumerTest.java | 16 ++++-
.../org/apache/falcon/aspect/GenericAlert.java | 10 +--
7 files changed, 92 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5f638a6..824bddd 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -33,6 +33,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-665 Handle message consumption failures in JMSMessageConsumer
+ (Venkatesh Seetharam)
+
FALCON-662 Fetch relationships for a given type API (Balu Vellanki via
Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 9c7b395..04ef037 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -285,6 +285,11 @@ public class WorkflowExecutionContext {
}
}
+ @Override
+ public String toString() {
+ return "WorkflowExecutionContext{" + context.toString() + "}";
+ }
+
@SuppressWarnings("unchecked")
public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException {
try {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
index fb2d58d..90b66c8 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java
@@ -82,7 +82,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
listeners.remove(listener);
}
- public void notifyFailure(WorkflowExecutionContext context) throws FalconException {
+ public void notifyFailure(WorkflowExecutionContext context) {
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onFailure(context);
@@ -95,7 +95,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
instrumentAlert(context);
}
- public void notifySuccess(WorkflowExecutionContext context) throws FalconException {
+ public void notifySuccess(WorkflowExecutionContext context) {
for (WorkflowExecutionListener listener : listeners) {
try {
listener.onSuccess(context);
@@ -108,7 +108,7 @@ public class WorkflowJobEndNotificationService implements FalconService {
instrumentAlert(context);
}
- private void instrumentAlert(WorkflowExecutionContext context) throws FalconException {
+ private void instrumentAlert(WorkflowExecutionContext context) {
String clusterName = context.getClusterName();
String entityName = context.getEntityName();
String entityType = context.getEntityType();
@@ -118,14 +118,14 @@ public class WorkflowJobEndNotificationService implements FalconService {
String nominalTime = context.getNominalTimeAsISO8601();
String runId = String.valueOf(context.getWorkflowRunId());
- CurrentUser.authenticate(context.getWorkflowUser());
- AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
- Date startTime = result.getInstances()[0].startTime;
- Date endTime = result.getInstances()[0].endTime;
- Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
-
try {
+ CurrentUser.authenticate(context.getWorkflowUser());
+ AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+ InstancesResult result = wfEngine.getJobDetails(clusterName, workflowId);
+ Date startTime = result.getInstances()[0].startTime;
+ Date endTime = result.getInstances()[0].endTime;
+ Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
+
if (context.hasWorkflowFailed()) {
GenericAlert.instrumentFailedInstance(clusterName, entityType,
entityName, nominalTime, workflowId, workflowUser, runId, operation,
@@ -135,8 +135,9 @@ public class WorkflowJobEndNotificationService implements FalconService {
entityName, nominalTime, workflowId, workflowUser, runId, operation,
SchemaHelper.formatDateUTC(startTime), duration);
}
- } catch (Exception e) {
- throw new FalconException(e);
+ } catch (FalconException e) {
+ // Logging an error and ignoring since there are listeners for extensions
+ LOG.error("Instrumenting alert failed for: " + context, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
index 9a6ad98..b7df443 100644
--- a/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/workflow/WorkflowJobEndNotificationServiceTest.java
@@ -109,19 +109,11 @@ public class WorkflowJobEndNotificationServiceTest implements WorkflowExecutionL
}
private void notifyFailure(WorkflowExecutionContext context) {
- try {
- service.notifyFailure(context);
- } catch (FalconException ignored) {
- // do nothing
- }
+ service.notifyFailure(context);
}
private void notifySuccess(WorkflowExecutionContext context) {
- try {
- service.notifySuccess(context);
- } catch (FalconException ignored) {
- // do nothing
- }
+ service.notifySuccess(context);
}
private static String[] getTestMessageArgs() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
index ec7bd93..4a0bc2a 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageConsumer.java
@@ -48,6 +48,8 @@ import java.util.Map;
public class JMSMessageConsumer implements MessageListener, ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(JMSMessageConsumer.class);
+ private static final String FALCON_CLIENT_ID = "falcon-server";
+
private final String implementation;
private final String userName;
private final String password;
@@ -56,7 +58,8 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
private final WorkflowJobEndNotificationService jobEndNotificationService;
private Connection connection;
- private TopicSubscriber subscriber;
+ private TopicSession topicSession;
+ private TopicSubscriber topicSubscriber;
public JMSMessageConsumer(String implementation, String userName,
String password, String url, String topicName,
@@ -72,15 +75,17 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
public void startSubscriber() throws FalconException {
try {
connection = createAndGetConnection(implementation, userName, password, url);
- TopicSession session = (TopicSession) connection.createSession(
- false, Session.AUTO_ACKNOWLEDGE);
- Topic destination = session.createTopic(topicName);
- subscriber = session.createSubscriber(destination);
- subscriber.setMessageListener(this);
+ connection.setClientID(FALCON_CLIENT_ID);
+
+ topicSession = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic destination = topicSession.createTopic(topicName);
+ topicSubscriber = topicSession.createDurableSubscriber(destination, FALCON_CLIENT_ID);
+ topicSubscriber.setMessageListener(this);
+
connection.setExceptionListener(this);
connection.start();
} catch (Exception e) {
- LOG.error("Error starting subscriber of topic: " + this.toString(), e);
+ LOG.error("Error starting topicSubscriber of topic: " + this.toString(), e);
throw new FalconException(e);
}
}
@@ -88,17 +93,19 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
@Override
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
- LOG.info("Received message {}", message.toString());
+ LOG.info("Received JMS message {}", message.toString());
try {
WorkflowExecutionContext context = createContext(mapMessage);
+ LOG.info("Created context from JMS message {}", context);
+
if (context.hasWorkflowFailed()) {
onFailure(context);
} else if (context.hasWorkflowSucceeded()) {
onSuccess(context);
}
- } catch (Exception e) {
- String errorMessage = "Error in onMessage for subscriber of topic: "
+ } catch (JMSException e) {
+ String errorMessage = "Error in onMessage for topicSubscriber of topic: "
+ topicName + ", Message: " + message.toString();
LOG.info(errorMessage, e);
GenericAlert.alertJMSMessageConsumerFailed(errorMessage, e);
@@ -118,33 +125,57 @@ public class JMSMessageConsumer implements MessageListener, ExceptionListener {
return WorkflowExecutionContext.create(wfProperties);
}
- public void onFailure(WorkflowExecutionContext context) throws FalconException {
+ public void onFailure(WorkflowExecutionContext context) {
jobEndNotificationService.notifyFailure(context);
}
- public void onSuccess(WorkflowExecutionContext context) throws FalconException {
+ public void onSuccess(WorkflowExecutionContext context) {
jobEndNotificationService.notifySuccess(context);
}
@Override
public void onException(JMSException ignore) {
- String errorMessage = "Error in onException for subscriber of topic: " + topicName;
+ String errorMessage = "Error in onException for topicSubscriber of topic: " + topicName;
LOG.info(errorMessage, ignore);
GenericAlert.alertJMSMessageConsumerFailed(errorMessage, ignore);
}
- public void closeSubscriber() throws FalconException {
- try {
- LOG.info("Closing subscriber on topic : " + this.topicName);
- if (subscriber != null) {
- subscriber.close();
+ public void closeSubscriber() {
+ LOG.info("Closing topicSubscriber on topic : " + this.topicName);
+ // closing each quietly so client id can be unsubscribed
+ closeTopicSubscriberQuietly();
+ closeTopicSessionQuietly();
+ closeConnectionQuietly();
+ }
+
+ private void closeTopicSubscriberQuietly() {
+ if (topicSubscriber != null) {
+ try {
+ topicSubscriber.close();
+ } catch (JMSException ignore) {
+ LOG.error("Error closing JMS topic subscriber: " + topicSubscriber, ignore);
}
- if (connection != null) {
+ }
+ }
+
+ private void closeTopicSessionQuietly() {
+ if (topicSession != null) { // unsubscribe the durable topic topicSubscriber
+ try {
+ topicSession.unsubscribe(FALCON_CLIENT_ID);
+ topicSession.close();
+ } catch (JMSException ignore) {
+ LOG.error("Error closing JMS topic session: " + topicSession, ignore);
+ }
+ }
+ }
+
+ private void closeConnectionQuietly() {
+ if (connection != null) {
+ try {
connection.close();
+ } catch (JMSException ignore) {
+ LOG.error("Error closing JMS connection: " + connection, ignore);
}
- } catch (JMSException e) {
- LOG.error("Error closing subscriber of topic: " + this.toString(), e);
- throw new FalconException(e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
----------------------------------------------------------------------
diff --git a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
index 9a4a6f7..974116d 100644
--- a/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
+++ b/messaging/src/test/java/org/apache/falcon/messaging/JMSMessageConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.messaging;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.falcon.FalconException;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
@@ -59,6 +60,7 @@ public class JMSMessageConsumerTest {
broker.setDataDirectory("target/activemq");
broker.setBrokerName("localhost");
broker.start();
+ broker.deleteAllMessages();
}
public void sendMessages(String topic) throws JMSException, FalconException, IOException {
@@ -137,11 +139,19 @@ public class JMSMessageConsumerTest {
BROKER_URL, TOPIC_NAME+","+SECONDARY_TOPIC_NAME, new WorkflowJobEndNotificationService());
subscriber.startSubscriber();
sendMessages(TOPIC_NAME);
- Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 9);
+
+ final BrokerView adminView = broker.getAdminView();
+
+ Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+ Assert.assertEquals(adminView.getTotalEnqueueCount(), 11);
+ Assert.assertEquals(adminView.getTotalConsumerCount(), 2);
sendMessages(SECONDARY_TOPIC_NAME);
- Assert.assertEquals(broker.getAdminView().getTotalEnqueueCount(), 17);
- Assert.assertEquals(broker.getAdminView().getTotalConsumerCount(), 2);
+
+ Assert.assertEquals(adminView.getTotalEnqueueCount(), 20);
+ Assert.assertEquals(adminView.getTotalDequeueCount(), 0);
+ Assert.assertEquals(adminView.getTotalConsumerCount(), 3);
+
subscriber.closeSubscriber();
} catch (Exception e) {
Assert.fail("This should not have thrown an exception.", e);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/caa22842/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
----------------------------------------------------------------------
diff --git a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
index d2019b5..d8efc37 100644
--- a/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
+++ b/metrics/src/main/java/org/apache/falcon/aspect/GenericAlert.java
@@ -55,7 +55,6 @@ public final class GenericAlert {
@Dimension(value = "run-id") String runId,
@Dimension(value = "error-message") String message) {
return "IGNORE";
-
}
@Monitored(event = "wf-instance-failed")
@@ -71,8 +70,7 @@ public final class GenericAlert {
@Dimension(value = "start-time") String startTime,
@Dimension(value = "error-message") String errorMessage,
@Dimension(value = "message") String message,
- @TimeTaken long timeTaken)
- throws Exception {
+ @TimeTaken long timeTaken) {
return "IGNORE";
}
@@ -88,8 +86,7 @@ public final class GenericAlert {
@Dimension(value = "run-id") String runId,
@Dimension(value = "operation") String operation,
@Dimension(value = "start-time") String startTime,
- @TimeTaken long timeTaken)
- throws Exception {
+ @TimeTaken long timeTaken) {
return "IGNORE";
}
@@ -100,7 +97,6 @@ public final class GenericAlert {
@Dimension(value = "message") String message,
@Dimension(value = "exception") Exception exception) {
return "IGNORE";
-
}
@Monitored(event = "log-cleanup-service-failed")
@@ -112,7 +108,7 @@ public final class GenericAlert {
@Monitored(event = "jms-message-consumer-failed")
public static String alertJMSMessageConsumerFailed(
- @Dimension(value = "error-message") String errorMessage,
+ @Dimension(value = "message") String message,
@Dimension(value = "exception") Throwable throwable) {
return "IGNORE";
}
[3/4] git commit: FALCON-731 Lineage capture for evicted instance is
broken. Contributed by Sowmya Ramesh
Posted by ve...@apache.org.
FALCON-731 Lineage capture for evicted instance is broken. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/00c6f1e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/00c6f1e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/00c6f1e5
Branch: refs/heads/master
Commit: 00c6f1e5d3b3a92f7efbf872d330bb88f1df552c
Parents: a94cb7a
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 15:04:48 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../org/apache/falcon/entity/ClusterHelper.java | 20 +---
.../InstanceRelationshipGraphBuilder.java | 68 ++++++-----
.../falcon/metadata/MetadataMappingService.java | 1 +
.../falcon/retention/EvictedInstanceSerDe.java | 118 +++++++++++++++++++
.../apache/falcon/retention/EvictionHelper.java | 88 --------------
.../falcon/workflow/WorkflowExecutionArgs.java | 2 +-
.../workflow/WorkflowExecutionContext.java | 2 +-
.../metadata/MetadataMappingServiceTest.java | 90 +++++++-------
.../falcon/messaging/JMSMessageProducer.java | 20 +---
.../apache/falcon/retention/FeedEvictor.java | 8 +-
11 files changed, 216 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 523b218..a2bd724 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-731 Lineage capture for evicted instance is broken
+ (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-724 Build fails as Integration test fails (Balu Vellanki via
Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
index cb3ea08..2689cb7 100644
--- a/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
+++ b/common/src/main/java/org/apache/falcon/entity/ClusterHelper.java
@@ -18,13 +18,13 @@
package org.apache.falcon.entity;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.store.ConfigurationStore;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.cluster.*;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.cluster.Interface;
+import org.apache.falcon.entity.v0.cluster.Interfacetype;
+import org.apache.falcon.entity.v0.cluster.Location;
+import org.apache.falcon.entity.v0.cluster.Property;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.util.HashMap;
@@ -40,12 +40,6 @@ public final class ClusterHelper {
private ClusterHelper() {
}
- public static FileSystem getFileSystem(String cluster) throws FalconException {
- Cluster clusterEntity = ConfigurationStore.get().get(EntityType.CLUSTER, cluster);
- Configuration conf = ClusterHelper.getConfiguration(clusterEntity);
- return HadoopClientFactory.get().createProxiedFileSystem(conf);
- }
-
public static Configuration getConfiguration(Cluster cluster) {
Configuration conf = new Configuration();
@@ -116,10 +110,6 @@ public final class ClusterHelper {
return normalizedPath.substring(0, normalizedPath.length() - 1);
}
- public static String getCompleteLocation(Cluster cluster, String locationKey) {
- return getStorageUrl(cluster) + "/" + getLocation(cluster, locationKey);
- }
-
public static String getLocation(Cluster cluster, String locationKey) {
for (Location loc : cluster.getLocations().getLocations()) {
if (loc.getName().equals(locationKey)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 4d9fbcf..5b5d62c 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Vertex;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.CatalogStorage;
-import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.common.FeedDataPath;
@@ -34,12 +33,10 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.net.URISyntaxException;
@@ -51,6 +48,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
private static final Logger LOG = LoggerFactory.getLogger(InstanceRelationshipGraphBuilder.class);
private static final String FEED_INSTANCE_FORMAT = "yyyyMMddHHmm"; // computed
+ private static final String IGNORE = "IGNORE";
// process workflow properties from message
private static final WorkflowExecutionArgs[] INSTANCE_WORKFLOW_PROPERTIES = {
@@ -207,39 +205,33 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public void addEvictedInstance(WorkflowExecutionContext context) throws FalconException {
- String outputFeedNamesArg = context.getOutputFeedNames();
- if ("NONE".equals(outputFeedNamesArg)) {
- LOG.info("There are no output feeds for this process, return");
+ final String outputFeedPaths = context.getOutputFeedInstancePaths();
+ if (IGNORE.equals(outputFeedPaths)) {
+ LOG.info("There were no evicted instances, nothing to record");
return;
}
- String logFile = context.getLogFile();
- if (StringUtils.isEmpty(logFile)){
- throw new IllegalArgumentException("csv log file path empty");
- }
-
+ LOG.info("Recording lineage for evicted instances {}", outputFeedPaths);
+ // For retention there will be only one output feed name
+ String feedName = context.getOutputFeedNames();
+ String[] evictedFeedInstancePathList = context.getOutputFeedInstancePathsList();
String clusterName = context.getClusterName();
- String[] paths = EvictionHelper.getInstancePaths(
- ClusterHelper.getFileSystem(clusterName), new Path(logFile));
- if (paths == null || paths.length <= 0) {
- throw new IllegalArgumentException("No instance paths in log file");
- }
- // For retention there will be only one output feed name
- String feedName = outputFeedNamesArg;
- for (String feedInstanceDataPath : paths) {
- LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstanceDataPath + ", in cluster: " + clusterName);
- RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+ for (String evictedFeedInstancePath : evictedFeedInstancePathList) {
+ LOG.info("Computing feed instance for : name= {}, path={}, in cluster: {}",
+ feedName, evictedFeedInstancePath, clusterName);
String feedInstanceName = getFeedInstanceName(feedName, clusterName,
- feedInstanceDataPath, context.getNominalTimeAsISO8601());
- Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
+ evictedFeedInstancePath, context.getNominalTimeAsISO8601());
+ Vertex feedInstanceVertex = findVertex(feedInstanceName,
+ RelationshipType.FEED_INSTANCE);
LOG.info("Vertex exists? name={}, type={}, v={}",
- feedInstanceName, vertexType, feedInstanceVertex);
- if (feedInstanceVertex == null) {
- throw new IllegalStateException(vertexType
- + " instance vertex must exist " + feedInstanceName);
+ feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+ if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+ LOG.info("{} instance vertex {} does not exist, add it",
+ RelationshipType.FEED_INSTANCE, feedInstanceName);
+ feedInstanceVertex = addFeedInstance(// add a new instance
+ feedInstanceName, context, feedName, clusterName);
}
addInstanceToEntity(feedInstanceVertex, clusterName, RelationshipType.CLUSTER_ENTITY,
@@ -251,16 +243,20 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
WorkflowExecutionContext context, String feedName,
String feedInstanceDataPath) throws FalconException {
String clusterName = context.getClusterName();
- LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstanceDataPath + ", in cluster: " + clusterName);
+ LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+ feedInstanceDataPath, clusterName);
String feedInstanceName = getFeedInstanceName(feedName, clusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
- LOG.info("Adding feed instance: " + feedInstanceName);
+ Vertex feedInstance = addFeedInstance(feedInstanceName, context, feedName, clusterName);
+ addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
+ }
+
+ private Vertex addFeedInstance(String feedInstanceName, WorkflowExecutionContext context,
+ String feedName, String clusterName) throws FalconException {
+ LOG.info("Adding feed instance {}", feedInstanceName);
Vertex feedInstance = addVertex(feedInstanceName, RelationshipType.FEED_INSTANCE,
context.getTimeStampAsISO8601());
- addProcessFeedEdge(processInstance, feedInstance, edgeLabel);
-
addInstanceToEntity(feedInstance, feedName,
RelationshipType.FEED_ENTITY, RelationshipLabel.INSTANCE_ENTITY_EDGE);
addInstanceToEntity(feedInstance, clusterName,
@@ -273,6 +269,8 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
addDataClassification(feed.getTags(), feedInstance);
addGroups(feed.getGroups(), feedInstance);
}
+
+ return feedInstance;
}
public static String getFeedInstanceName(String feedName, String clusterName,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index f607e0a..46f8a61 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -271,6 +271,7 @@ public class MetadataMappingService
case DELETE:
onFeedInstanceEvicted(context);
+ getTransactionalGraph().commit();
break;
default:
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
new file mode 100644
index 0000000..c2f222b
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/retention/EvictedInstanceSerDe.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.retention;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Utility class for serializing and deserializing the evicted instance paths.
+ */
+
+public final class EvictedInstanceSerDe {
+
+ private static final Logger LOG = LoggerFactory.getLogger(EvictedInstanceSerDe.class);
+
+ private static final String INSTANCEPATH_PREFIX = "instancePaths=";
+ private static final String INSTANCES_SEPARATOR = "=";
+ public static final String INSTANCEPATH_SEPARATOR = ",";
+
+
+ private EvictedInstanceSerDe() {}
+
+ /**
+ * This method serializes the evicted instances to a file in logs dir for a given feed.
+ * @see org.apache.falcon.retention.FeedEvictor
+ *
+ * *Note:* This is executed with in the map task for evictor action
+ *
+ * @param fileSystem file system handle
+ * @param logFilePath File path to serialize the instances to
+ * @param instances list of instances, comma separated
+ * @throws IOException
+ */
+ public static void serializeEvictedInstancePaths(final FileSystem fileSystem,
+ final Path logFilePath,
+ StringBuffer instances) throws IOException {
+ LOG.info("Writing deleted instances {} to path {}", instances, logFilePath);
+ OutputStream out = null;
+ try {
+ out = fileSystem.create(logFilePath);
+ instances.insert(0, INSTANCEPATH_PREFIX); // add the prefix
+ out.write(instances.toString().getBytes());
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ logEvictedInstancePaths(fileSystem, logFilePath);
+ }
+ }
+
+ private static void logEvictedInstancePaths(final FileSystem fs,
+ final Path outPath) throws IOException {
+ ByteArrayOutputStream writer = new ByteArrayOutputStream();
+ InputStream instance = fs.open(outPath);
+ IOUtils.copyBytes(instance, writer, 4096, true);
+ LOG.debug("Instance Paths copied to {}", outPath);
+ LOG.debug("Written {}", writer);
+ }
+
+ /**
+ * This method deserializes the evicted instances from a log file on hdfs.
+ * @see org.apache.falcon.messaging.JMSMessageProducer
+ * *Note:* This is executed with in the falcon server
+ *
+ * @param fileSystem file system handle
+ * @param logFile File path to serialize the instances to
+ * @return list of instances, comma separated
+ * @throws IOException
+ */
+ public static String[] deserializeEvictedInstancePaths(final FileSystem fileSystem,
+ final Path logFile) throws IOException {
+ try {
+ ByteArrayOutputStream writer = new ByteArrayOutputStream();
+ InputStream instance = fileSystem.open(logFile);
+ IOUtils.copyBytes(instance, writer, 4096, true);
+ String[] instancePaths = writer.toString().split(INSTANCES_SEPARATOR);
+
+ LOG.info("Deleted feed instance paths file:" + logFile);
+ if (instancePaths.length == 1) {
+ LOG.debug("Returning 0 instance paths for feed ");
+ return new String[0];
+ } else {
+ LOG.debug("Returning instance paths for feed " + instancePaths[1]);
+ return instancePaths[1].split(INSTANCEPATH_SEPARATOR);
+ }
+ } finally {
+ // clean up the serialized state
+ fileSystem.delete(logFile, true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
deleted file mode 100644
index 5d6481c..0000000
--- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.falcon.retention;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.falcon.FalconException;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Helper methods to facilitate eviction.
- */
-
-public final class EvictionHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(EvictionHelper.class);
-
- private static final String INSTANCEPATH_FORMAT = "instancePaths=";
- public static final String INSTANCEPATH_SEPARATOR = ",";
-
-
- private EvictionHelper() {}
-
- public static void logInstancePaths(final FileSystem logfs, final Path path,
- final String data) throws IOException {
- LOG.info("Writing deleted instances to path {}", path);
- OutputStream out = logfs.create(path);
- out.write(INSTANCEPATH_FORMAT.getBytes());
- out.write(data.getBytes());
- out.close();
- debug(logfs, path);
- }
-
- public static String[] getInstancePaths(final FileSystem fs,
- final Path logFile) throws FalconException {
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- try {
- InputStream date = fs.open(logFile);
- IOUtils.copyBytes(date, writer, 4096, true);
- } catch (IOException e) {
- throw new FalconException(e);
- }
- String logData = writer.toString();
- if (StringUtils.isEmpty(logData)) {
- throw new FalconException("csv file is empty");
- }
-
- String[] parts = logData.split(INSTANCEPATH_FORMAT);
- if (parts.length != 2) {
- throw new FalconException("Instance path in csv file not in required format: " + logData);
- }
-
- // part[0] is instancePaths=
- return parts[1].split(INSTANCEPATH_SEPARATOR);
- }
-
- private static void debug(final FileSystem fs, final Path outPath) throws IOException {
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- InputStream instance = fs.open(outPath);
- IOUtils.copyBytes(instance, writer, 4096, true);
- LOG.debug("Instance Paths copied to {}", outPath);
- LOG.debug("Written {}", writer);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
index 514bafe..0a8be64 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionArgs.java
@@ -75,7 +75,7 @@ public enum WorkflowExecutionArgs {
BRKR_TTL("brokerTTL", "time to live for broker message in sec", false),
// state maintained
- LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
+ LOG_FILE("logFile", "log file path where feeds to be deleted are recorded", false),
// execution context data recorded
LOG_DIR("logDir", "log dir where lineage can be recorded"),
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index 04ef037..ef55ba9 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -87,7 +87,7 @@ public class WorkflowExecutionContext {
WorkflowExecutionArgs.RUN_ID,
WorkflowExecutionArgs.STATUS,
WorkflowExecutionArgs.TIMESTAMP,
- WorkflowExecutionArgs.LOG_FILE,
+ WorkflowExecutionArgs.LOG_DIR,
};
private final Map<WorkflowExecutionArgs, String> context;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 3b9fdba..895a5f7 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -23,7 +23,6 @@ import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Graph;
import com.tinkerpop.blueprints.GraphQuery;
import com.tinkerpop.blueprints.Vertex;
-import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.cluster.util.EntityBuilderTestUtil;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
@@ -41,7 +40,7 @@ import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Outputs;
import org.apache.falcon.entity.v0.process.Process;
-import org.apache.falcon.retention.EvictionHelper;
+import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.StartupProperties;
@@ -49,7 +48,6 @@ import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import static org.apache.falcon.workflow.WorkflowExecutionContext.EntityOperations;
import org.apache.falcon.workflow.WorkflowJobEndNotificationService;
-import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
@@ -70,7 +68,6 @@ public class MetadataMappingServiceTest {
public static final String FALCON_USER = "falcon-user";
private static final String LOGS_DIR = "/falcon/staging/feed/logs";
- private static final String LOG_FILE = "instancePaths-2014-01-01-01-00.csv";
private static final String NOMINAL_TIME = "2014-01-01-01-00";
public static final String CLUSTER_ENTITY_NAME = "primary-cluster";
@@ -93,7 +90,8 @@ public class MetadataMappingServiceTest {
public static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
public static final String OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
- private static final String REPLICATED_INSTANCE = "raw-click";
+ private static final String REPLICATED_FEED = "raw-click";
+ private static final String EVICTED_FEED = "imp-click-join1";
private static final String EVICTED_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join1/20140102";
public static final String OUTPUT_INSTANCE_PATHS_NO_DATE =
@@ -109,10 +107,6 @@ public class MetadataMappingServiceTest {
private List<Feed> inputFeeds = new ArrayList<Feed>();
private List<Feed> outputFeeds = new ArrayList<Feed>();
private Process processEntity;
- private EmbeddedCluster embeddedCluster;
- private String hdfsUrl;
- private static String logFilePath;
-
@BeforeClass
public void setUp() throws Exception {
@@ -121,6 +115,8 @@ public class MetadataMappingServiceTest {
configStore = ConfigurationStore.get();
Services.get().register(new WorkflowJobEndNotificationService());
+ StartupProperties.get().setProperty("falcon.graph.storage.directory",
+ "target/graphdb-" + System.currentTimeMillis());
StartupProperties.get().setProperty("falcon.graph.preserve.history", "true");
service = new MetadataMappingService();
service.init();
@@ -258,7 +254,8 @@ public class MetadataMappingServiceTest {
GraphUtils.dump(service.getGraph());
// Verify if instance name has nominal time
- List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(RelationshipType.FEED_INSTANCE.getName());
+ List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(
+ RelationshipType.FEED_INSTANCE.getName());
List<String> expected = Arrays.asList("impression-feed/2014-01-01T01:00Z", "clicks-feed/2014-01-01T01:00Z",
"imp-click-join1/2014-01-01T01:00Z", "imp-click-join2/2014-01-01T01:00Z");
Assert.assertTrue(feedNamesOwnedByUser.containsAll(expected));
@@ -270,20 +267,20 @@ public class MetadataMappingServiceTest {
}
@Test
- public void testLineageForReplication() throws Exception {
+ public void testLineageForReplication() throws Exception {
setupForLineageReplication();
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
- EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_INSTANCE,
+ EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
"jail://global:00/falcon/raw-click/bcp/20140101",
- "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_INSTANCE),
+ "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
debug(service.getGraph());
GraphUtils.dump(service.getGraph());
- verifyLineageGraphForReplicationOrEviction(REPLICATED_INSTANCE,
+ verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
"jail://global:00/falcon/raw-click/bcp/20140101", context,
RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
@@ -303,12 +300,11 @@ public class MetadataMappingServiceTest {
}
@Test
- public void testLineageForRetention() throws Exception {
- setupForLineageEviciton();
- String feedName = "imp-click-join1";
+ public void testLineageForRetention() throws Exception {
+ setupForLineageEviction();
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
- feedName, "IGNORE", "IGNORE", feedName),
+ EVICTED_FEED, EVICTED_INSTANCE_PATHS, "IGNORE", EVICTED_FEED),
WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
@@ -322,9 +318,9 @@ public class MetadataMappingServiceTest {
List<String> ownedAndSecureFeeds = Arrays.asList("clicks-feed/2014-01-01T00:00Z",
"imp-click-join1/2014-01-01T00:00Z", "imp-click-join1/2014-01-02T00:00Z");
verifyLineageGraph(RelationshipType.FEED_INSTANCE.getName(), expectedFeeds, secureFeeds, ownedAndSecureFeeds);
- String[] paths = EVICTED_INSTANCE_PATHS.split(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ String[] paths = EVICTED_INSTANCE_PATHS.split(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
for (String feedInstanceDataPath : paths) {
- verifyLineageGraphForReplicationOrEviction(feedName, feedInstanceDataPath, context,
+ verifyLineageGraphForReplicationOrEviction(EVICTED_FEED, feedInstanceDataPath, context,
RelationshipLabel.FEED_CLUSTER_EVICTED_EDGE);
}
@@ -336,6 +332,27 @@ public class MetadataMappingServiceTest {
Assert.assertEquals(getEdgesCount(service.getGraph()), 72);
}
+ @Test
+ public void testLineageForRetentionWithNoFeedsEvicted() throws Exception {
+ cleanUp();
+ service.init();
+ long beforeVerticesCount = getVerticesCount(service.getGraph());
+ long beforeEdgesCount = getEdgesCount(service.getGraph());
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.DELETE, EVICTION_WORKFLOW_NAME,
+ EVICTED_FEED, "IGNORE", "IGNORE", EVICTED_FEED),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+
+ service.onSuccess(context);
+
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+ // No new vertices added
+ Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount);
+ // No new edges added
+ Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount);
+ }
+
@Test (dependsOnMethods = "testOnAdd")
public void testOnChange() throws Exception {
// shutdown the graph and resurrect for testing
@@ -673,7 +690,9 @@ public class MetadataMappingServiceTest {
// feeds owned by a user
List<String> feedNamesOwnedByUser = getFeedsOwnedByAUser(feedType.getName());
Assert.assertEquals(feedNamesOwnedByUser,
- Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1", "imp-click-join2"));
+ Arrays.asList("impression-feed", "clicks-feed", "imp-click-join1",
+ "imp-click-join2")
+ );
// feeds classified as secure
verifyFeedsClassifiedAsSecure(feedType.getName(),
@@ -736,7 +755,8 @@ public class MetadataMappingServiceTest {
}
}
}
- Assert.assertTrue(actual.containsAll(expected), "Actual does not contain expected: " + actual);
+ Assert.assertTrue(actual.containsAll(expected),
+ "Actual does not contain expected: " + actual);
}
public long getVerticesCount(final Graph graph) {
@@ -855,8 +875,6 @@ public class MetadataMappingServiceTest {
"-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
"-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
- "-" + WorkflowExecutionArgs.LOG_FILE.getName(),
- (logFilePath != null ? logFilePath : LOGS_DIR + "/log" + ".txt"),
};
}
@@ -907,7 +925,7 @@ public class MetadataMappingServiceTest {
Cluster[] clusters = {clusterEntity, bcpCluster};
// Add feed
- Feed rawFeed = addFeedEntity(REPLICATED_INSTANCE, clusters,
+ Feed rawFeed = addFeedEntity(REPLICATED_FEED, clusters,
"classified-as=Secure", "analytics", Storage.TYPE.FILESYSTEM,
"/falcon/raw-click/${YEAR}/${MONTH}/${DAY}");
// Add uri template for each cluster
@@ -945,22 +963,12 @@ public class MetadataMappingServiceTest {
EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
"jail://global:00/falcon/imp-click-join1/20140101",
"jail://global:00/falcon/raw-click/primary/20140101",
- REPLICATED_INSTANCE), WorkflowExecutionContext.Type.POST_PROCESSING);
+ REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
}
- private void setupForLineageEviciton() throws Exception {
- cleanUp();
- service.init();
-
- // Add cluster
- embeddedCluster = EmbeddedCluster.newCluster(CLUSTER_ENTITY_NAME, true, COLO_NAME,
- "classification=production");
- clusterEntity = embeddedCluster.getCluster();
- configStore.publish(EntityType.CLUSTER, clusterEntity);
- hdfsUrl = embeddedCluster.getConf().get("fs.default.name");
-
- addFeedsAndProcess(clusterEntity);
+ private void setupForLineageEviction() throws Exception {
+ setup();
// GENERATE WF should have run before this to create all instance related vertices
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -968,12 +976,6 @@ public class MetadataMappingServiceTest {
"imp-click-join1,imp-click-join1", EVICTED_INSTANCE_PATHS, null, null),
WorkflowExecutionContext.Type.POST_PROCESSING);
service.onSuccess(context);
-
- // Write to csv file
- String csvData = EVICTED_INSTANCE_PATHS;
- logFilePath = hdfsUrl + LOGS_DIR + "/" + LOG_FILE;
- Path path = new Path(logFilePath);
- EvictionHelper.logInstancePaths(path.getFileSystem(EmbeddedCluster.newConfiguration()), path, csvData);
}
private void setupForNoDateInFeedPath() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
----------------------------------------------------------------------
diff --git a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
index 0181e74..a60e951 100644
--- a/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
+++ b/messaging/src/main/java/org/apache/falcon/messaging/JMSMessageProducer.java
@@ -18,12 +18,12 @@
package org.apache.falcon.messaging;
+import org.apache.falcon.retention.EvictedInstanceSerDe;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,9 +35,7 @@ import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
@@ -290,19 +288,7 @@ public class JMSMessageProducer {
return new String[0];
}
- ByteArrayOutputStream writer = new ByteArrayOutputStream();
- InputStream instance = fs.open(logFile);
- IOUtils.copyBytes(instance, writer, 4096, true);
- String[] instancePaths = writer.toString().split("=");
- fs.delete(logFile, true);
- LOG.info("Deleted feed instance paths file:" + logFile);
- if (instancePaths.length == 1) {
- LOG.debug("Returning 0 instance paths for feed ");
- return new String[0];
- } else {
- LOG.debug("Returning instance paths for feed " + instancePaths[1]);
- return instancePaths[1].split(",");
- }
+ return EvictedInstanceSerDe.deserializeEvictedInstancePaths(fs, logFile);
}
private Map<String, String> buildMessage(final WorkflowExecutionArgs[] filter) {
@@ -311,6 +297,8 @@ public class JMSMessageProducer {
message.put(arg.getName(), context.getValue(arg));
}
+ // this is NOT useful since the file is deleted after message is sent
+ message.remove(WorkflowExecutionArgs.LOG_FILE.getName());
return message;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/00c6f1e5/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
----------------------------------------------------------------------
diff --git a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
index 114071f..9589edf 100644
--- a/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
+++ b/retention/src/main/java/org/apache/falcon/retention/FeedEvictor.java
@@ -126,7 +126,8 @@ public class FeedEvictor extends Configured implements Tool {
evict(storage, retentionLimit, timeZone);
Path path = new Path(logFile);
- EvictionHelper.logInstancePaths(path.getFileSystem(getConf()), path, instancePaths.toString());
+ EvictedInstanceSerDe.serializeEvictedInstancePaths(
+ path.getFileSystem(getConf()), path, instancePaths);
int len = buffer.length();
if (len > 0) {
@@ -180,7 +181,7 @@ public class FeedEvictor extends Configured implements Tool {
deleteInstance(fs, path, feedBasePath);
Date date = getDate(path, feedPath, dateMask, timeZone);
buffer.append(dateFormat.format(date)).append(',');
- instancePaths.append(path).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ instancePaths.append(path).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
}
@@ -532,7 +533,7 @@ public class FeedEvictor extends Configured implements Tool {
String partitionInfo = partitionToDrop.getValues().toString().replace("," , ";");
LOG.info("Deleted partition: " + partitionInfo);
buffer.append(partSpec).append(',');
- instancePaths.append(partitionInfo).append(EvictionHelper.INSTANCEPATH_SEPARATOR);
+ instancePaths.append(partitionInfo).append(EvictedInstanceSerDe.INSTANCEPATH_SEPARATOR);
}
}
}
@@ -553,5 +554,4 @@ public class FeedEvictor extends Configured implements Tool {
}
}
}
-
}
[4/4] git commit: FALCON-732 Lineage capture fails for an instance
thats not generated by falcon. Contributed by Sowmya Ramesh
Posted by ve...@apache.org.
FALCON-732 Lineage capture fails for an instance thats not generated by falcon. Contributed by Sowmya Ramesh
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/33b420b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/33b420b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/33b420b5
Branch: refs/heads/master
Commit: 33b420b5d04cd08c936ddf9d6e8081eab32015c4
Parents: 00c6f1e
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Wed Sep 17 16:48:30 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Wed Sep 17 16:56:57 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../InstanceRelationshipGraphBuilder.java | 35 ++++-----
.../falcon/metadata/MetadataMappingService.java | 1 +
.../workflow/WorkflowExecutionContext.java | 22 +++++-
.../metadata/MetadataMappingServiceTest.java | 82 ++++++++++++++------
.../feed/FeedReplicationCoordinatorBuilder.java | 4 +
.../feed/OozieFeedWorkflowBuilderTest.java | 6 +-
.../falcon/oozie/process/AbstractTestBase.java | 15 +++-
8 files changed, 119 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a2bd724..0558ab4 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -94,6 +94,9 @@ Trunk (Unreleased)
OPTIMIZATIONS
BUG FIXES
+ FALCON-732 Lineage capture fails for an instance thats not generated by
+ falcon (Sowmya Ramesh via Venkatesh Seetharam)
+
FALCON-731 Lineage capture for evicted instance is broken
(Sowmya Ramesh via Venkatesh Seetharam)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
index 5b5d62c..764e732 100644
--- a/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
+++ b/common/src/main/java/org/apache/falcon/metadata/InstanceRelationshipGraphBuilder.java
@@ -68,7 +68,7 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
public Vertex addProcessInstance(WorkflowExecutionContext context) throws FalconException {
String processInstanceName = getProcessInstanceName(context);
- LOG.info("Adding process instance: " + processInstanceName);
+ LOG.info("Adding process instance: {}", processInstanceName);
Vertex processInstance = addVertex(processInstanceName,
RelationshipType.PROCESS_INSTANCE, context.getTimeStampAsISO8601());
@@ -175,29 +175,24 @@ public class InstanceRelationshipGraphBuilder extends RelationshipGraphBuilder {
}
public void addReplicatedInstance(WorkflowExecutionContext context) throws FalconException {
- String outputFeedNamesArg = context.getOutputFeedNames();
- if ("NONE".equals(outputFeedNamesArg)) {
- return; // there are no output feeds
- }
-
- String[] outputFeedNames = context.getOutputFeedNamesList();
- String[] outputFeedInstancePaths = context.getOutputFeedInstancePathsList();
+ // For replication there will be only one output feed name and path
+ String feedName = context.getOutputFeedNames();
+ String feedInstanceDataPath = context.getOutputFeedInstancePaths();
String targetClusterName = context.getClusterName();
- // For replication there will be only one output feed name
- String feedName = outputFeedNames[0];
- String feedInstanceDataPath = outputFeedInstancePaths[0];
-
- LOG.info("Computing feed instance for : name=" + feedName + ", path= "
- + feedInstanceDataPath + ", in cluster: " + targetClusterName);
- RelationshipType vertexType = RelationshipType.FEED_INSTANCE;
+ LOG.info("Computing feed instance for : name= {} path= {}, in cluster: {}", feedName,
+ feedInstanceDataPath, targetClusterName);
String feedInstanceName = getFeedInstanceName(feedName, targetClusterName,
feedInstanceDataPath, context.getNominalTimeAsISO8601());
- Vertex feedInstanceVertex = findVertex(feedInstanceName, vertexType);
-
- LOG.info("Vertex exists? name={}, type={}, v={}", feedInstanceName, vertexType, feedInstanceVertex);
- if (feedInstanceVertex == null) {
- throw new IllegalStateException(vertexType + " instance vertex must exist " + feedInstanceName);
+ Vertex feedInstanceVertex = findVertex(feedInstanceName, RelationshipType.FEED_INSTANCE);
+
+ LOG.info("Vertex exists? name={}, type={}, v={}",
+ feedInstanceName, RelationshipType.FEED_INSTANCE, feedInstanceVertex);
+ if (feedInstanceVertex == null) { // No record of instances NOT generated by Falcon
+ LOG.info("{} instance vertex {} does not exist, add it",
+ RelationshipType.FEED_INSTANCE, feedInstanceName);
+ feedInstanceVertex = addFeedInstance( // add a new instance
+ feedInstanceName, context, feedName, context.getSrcClusterName());
}
addInstanceToEntity(feedInstanceVertex, targetClusterName, RelationshipType.CLUSTER_ENTITY,
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
index 46f8a61..0a77bf1 100644
--- a/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
+++ b/common/src/main/java/org/apache/falcon/metadata/MetadataMappingService.java
@@ -267,6 +267,7 @@ public class MetadataMappingService
case REPLICATE:
onFeedInstanceReplicated(context);
+ getTransactionalGraph().commit();
break;
case DELETE:
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
index ef55ba9..4c94c27 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java
@@ -54,6 +54,7 @@ public class WorkflowExecutionContext {
public static final String OUTPUT_FEED_SEPARATOR = ",";
public static final String INPUT_FEED_SEPARATOR = "#";
+ public static final String CLUSTER_NAME_SEPARATOR = ",";
/**
* Workflow execution status.
@@ -160,7 +161,26 @@ public class WorkflowExecutionContext {
}
public String getClusterName() {
- return getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ if (EntityOperations.REPLICATE != getOperation()) {
+ return value;
+ }
+
+ return value.split(CLUSTER_NAME_SEPARATOR)[0];
+ }
+
+ public String getSrcClusterName() {
+ String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME);
+ if (EntityOperations.REPLICATE != getOperation()) {
+ return value;
+ }
+
+ String[] parts = value.split(CLUSTER_NAME_SEPARATOR);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException("Replicated cluster pair is missing in " + value);
+ }
+
+ return parts[1];
}
public String getEntityName() {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
index 895a5f7..11d27fe 100644
--- a/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
+++ b/common/src/test/java/org/apache/falcon/metadata/MetadataMappingServiceTest.java
@@ -169,7 +169,7 @@ public class MetadataMappingServiceTest {
"/falcon/impression-feed/${YEAR}/${MONTH}/${DAY}");
inputFeeds.add(impressionsFeed);
verifyEntityWasAddedToGraph(impressionsFeed.getName(), RelationshipType.FEED_ENTITY);
- verifyFeedEntityEdges(impressionsFeed.getName());
+ verifyFeedEntityEdges(impressionsFeed.getName(), "Secure", "analytics");
Assert.assertEquals(getVerticesCount(service.getGraph()), 7); // +4 = feed, tag, group, user
Assert.assertEquals(getEdgesCount(service.getGraph()), 6); // +4 = cluster, tag, group, user
@@ -300,6 +300,37 @@ public class MetadataMappingServiceTest {
}
@Test
+ public void testLineageForReplicationForNonGeneratedInstances() throws Exception {
+ cleanUp();
+ service.init();
+
+ addClusterAndFeedForReplication();
+ // Get the vertices before running replication WF
+ long beforeVerticesCount = getVerticesCount(service.getGraph());
+ long beforeEdgesCount = getEdgesCount(service.getGraph());
+
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.REPLICATE, REPLICATION_WORKFLOW_NAME, REPLICATED_FEED,
+ "jail://global:00/falcon/raw-click/bcp/20140101",
+ "jail://global:00/falcon/raw-click/primary/20140101", REPLICATED_FEED),
+ WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+
+ debug(service.getGraph());
+ GraphUtils.dump(service.getGraph());
+
+ verifyFeedEntityEdges(REPLICATED_FEED, "Secure", "analytics");
+ verifyLineageGraphForReplicationOrEviction(REPLICATED_FEED,
+ "jail://global:00/falcon/raw-click/bcp/20140101", context,
+ RelationshipLabel.FEED_CLUSTER_REPLICATED_EDGE);
+
+ // +1 for the new instance vertex added
+ Assert.assertEquals(getVerticesCount(service.getGraph()), beforeVerticesCount + 1);
+ // +6 = instance-of, stored-in, owned-by, classification, group, replicated-to
+ Assert.assertEquals(getEdgesCount(service.getGraph()), beforeEdgesCount + 6);
+ }
+
+ @Test
public void testLineageForRetention() throws Exception {
setupForLineageEviction();
WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
@@ -605,7 +636,7 @@ public class MetadataMappingServiceTest {
"production", RelationshipType.TAGS.getName());
}
- private void verifyFeedEntityEdges(String feedName) {
+ private void verifyFeedEntityEdges(String feedName, String tag, String group) {
Vertex feedVertex = getEntityVertex(feedName, RelationshipType.FEED_ENTITY);
// verify edge to cluster vertex
@@ -614,12 +645,13 @@ public class MetadataMappingServiceTest {
// verify edge to user vertex
verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.USER.getName(),
FALCON_USER, RelationshipType.USER.getName());
+
// verify edge to tags vertex
verifyVertexForEdge(feedVertex, Direction.OUT, "classified-as",
- "Secure", RelationshipType.TAGS.getName());
+ tag, RelationshipType.TAGS.getName());
// verify edge to group vertex
verifyVertexForEdge(feedVertex, Direction.OUT, RelationshipLabel.GROUPS.getName(),
- "analytics", RelationshipType.GROUPS.getName());
+ group, RelationshipType.GROUPS.getName());
}
private void verifyProcessEntityEdges() {
@@ -834,7 +866,7 @@ public class MetadataMappingServiceTest {
String falconInputFeeds) {
String cluster;
if (EntityOperations.REPLICATE == operation) {
- cluster = BCP_CLUSTER_ENTITY_NAME;
+ cluster = BCP_CLUSTER_ENTITY_NAME + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + CLUSTER_ENTITY_NAME;
} else {
cluster = CLUSTER_ENTITY_NAME;
}
@@ -916,6 +948,28 @@ public class MetadataMappingServiceTest {
cleanUp();
service.init();
+ addClusterAndFeedForReplication();
+
+ // Add output feed
+ Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
+ "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
+ "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
+ outputFeeds.add(join1Feed);
+
+ processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
+ "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
+ WORKFLOW_VERSION);
+
+ // GENERATE WF should have run before this to create all instance related vertices
+ WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
+ EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
+ "jail://global:00/falcon/imp-click-join1/20140101",
+ "jail://global:00/falcon/raw-click/primary/20140101",
+ REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
+ service.onSuccess(context);
+ }
+
+ private void addClusterAndFeedForReplication() throws Exception {
// Add cluster
clusterEntity = addClusterEntity(CLUSTER_ENTITY_NAME, COLO_NAME,
"classification=production");
@@ -947,24 +1001,6 @@ public class MetadataMappingServiceTest {
configStore.cleanupUpdateInit();
}
inputFeeds.add(rawFeed);
-
- // Add output feed
- Feed join1Feed = addFeedEntity("imp-click-join1", clusterEntity,
- "classified-as=Financial", "reporting,bi", Storage.TYPE.FILESYSTEM,
- "/falcon/imp-click-join1/${YEAR}${MONTH}${DAY}");
- outputFeeds.add(join1Feed);
-
- processEntity = addProcessEntity(PROCESS_ENTITY_NAME, clusterEntity,
- "classified-as=Critical", "testPipeline,dataReplication_Pipeline", GENERATE_WORKFLOW_NAME,
- WORKFLOW_VERSION);
-
- // GENERATE WF should have run before this to create all instance related vertices
- WorkflowExecutionContext context = WorkflowExecutionContext.create(getTestMessageArgs(
- EntityOperations.GENERATE, GENERATE_WORKFLOW_NAME, "imp-click-join1",
- "jail://global:00/falcon/imp-click-join1/20140101",
- "jail://global:00/falcon/raw-click/primary/20140101",
- REPLICATED_FEED), WorkflowExecutionContext.Type.POST_PROCESSING);
- service.onSuccess(context);
}
private void setupForLineageEviction() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
index 966f90e..801d733 100644
--- a/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/oozie/feed/FeedReplicationCoordinatorBuilder.java
@@ -159,6 +159,10 @@ public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<F
workflow.setAppPath(getStoragePath(buildPath));
Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
+ // Override CLUSTER_NAME property to include both source and target cluster pair
+ String clusterProperty = trgCluster.getName()
+ + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
+ props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
index 3c49353..379cf34 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/feed/OozieFeedWorkflowBuilderTest.java
@@ -192,7 +192,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
HashMap<String, String> props = getCoordProperties(coord);
- verifyEntityProperties(feed, trgCluster,
+ verifyEntityProperties(feed, trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
@@ -332,7 +332,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
Assert.assertEquals(props.get("maxMaps"), "33");
Assert.assertEquals(props.get("mapBandwidthKB"), "2048");
- verifyEntityProperties(aFeed, aCluster,
+ verifyEntityProperties(aFeed, aCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
@@ -456,7 +456,7 @@ public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
wfPath.toString());
- verifyEntityProperties(tableFeed, trgCluster,
+ verifyEntityProperties(tableFeed, trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
verifyBrokerProperties(trgCluster, props);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/33b420b5/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
index b547c31..b549cfb 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/process/AbstractTestBase.java
@@ -214,18 +214,29 @@ public class AbstractTestBase {
return props;
}
- protected void verifyEntityProperties(Entity entity, Cluster cluster,
+ protected void verifyEntityProperties(Entity entity, Cluster cluster, Cluster srcCluster,
WorkflowExecutionContext.EntityOperations operation,
HashMap<String, String> props) throws Exception {
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_NAME.getName()),
entity.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.ENTITY_TYPE.getName()),
entity.getEntityType().name());
- Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+ if (WorkflowExecutionContext.EntityOperations.REPLICATE == operation) {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()),
+ cluster.getName() + WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName());
+ } else {
+ Assert.assertEquals(props.get(WorkflowExecutionArgs.CLUSTER_NAME.getName()), cluster.getName());
+ }
Assert.assertEquals(props.get(WorkflowExecutionArgs.LOG_DIR.getName()), getLogPath(cluster, entity));
Assert.assertEquals(props.get("falconDataOperation"), operation.name());
}
+ protected void verifyEntityProperties(Entity entity, Cluster cluster,
+ WorkflowExecutionContext.EntityOperations operation,
+ HashMap<String, String> props) throws Exception {
+ verifyEntityProperties(entity, cluster, null, operation, props);
+ }
+
private String getLogPath(Cluster cluster, Entity entity) {
Path logPath = EntityUtil.getLogPath(cluster, entity);
return (logPath.toUri().getScheme() == null ? "${nameNode}" : "") + logPath;