You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2012/11/07 09:50:03 UTC

svn commit: r1406498 [8/8] - in /incubator/ambari/branches/AMBARI-666: ./ ambari-agent/src/main/puppet/modules/hdp/manifests/ ambari-agent/src/main/puppet/modules/hdp/manifests/lib/ ambari-agent/src/main/python/ambari_agent/ ambari-server/src/main/java...

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.eventdb.model;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class WorkflowContext {
+  
+  private String workflowId;
+  private String workflowName;
+  private String workflowEntityName;
+  
+  private WorkflowDag workflowDag;
+  
+  private WorkflowContext parentWorkflowContext;
+  
+  public WorkflowContext() {
+    /* Required by JAXB. */
+  }
+  
+  /* Getters. */
+  public String getWorkflowId() {
+    return this.workflowId;
+  }
+  
+  public String getWorkflowName() {
+    return this.workflowName;
+  }
+  
+  public String getWorkflowEntityName() {
+    return this.workflowEntityName;
+  }
+  
+  public WorkflowDag getWorkflowDag() {
+    return this.workflowDag;
+  }
+  
+  public WorkflowContext getParentWorkflowContext() {
+    return this.parentWorkflowContext;
+  }
+  
+  /* Setters. */
+  public void setWorkflowId(String wfId) {
+    this.workflowId = wfId;
+  }
+  
+  public void setWorkflowName(String wfName) {
+    this.workflowName = wfName;
+  }
+  
+  public void setWorkflowEntityName(String wfEntityName) {
+    this.workflowEntityName = wfEntityName;
+  }
+  
+  public void setWorkflowDag(WorkflowDag wfDag) {
+    this.workflowDag = wfDag;
+  }
+  
+  public void setParentWorkflowContext(WorkflowContext pWfContext) {
+    this.parentWorkflowContext = pWfContext;
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.eventdb.model;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class WorkflowDag {
+  
+  public static class WorkflowDagEntry {
+    
+    private String source;
+    private List<String> targets = new ArrayList<String>();
+    
+    public WorkflowDagEntry() {
+      /* Required by JAXB. */
+    }
+    
+    /* Getters. */
+    public String getSource() {
+      return this.source;
+    }
+    
+    public List<String> getTargets() {
+      return this.targets;
+    }
+    
+    /* Setters. */
+    public void setSource(String source) {
+      this.source = source;
+    }
+    
+    public void setTargets(List<String> targets) {
+      this.targets = targets;
+    }
+    
+    public void addTarget(String target) {
+      this.targets.add(target);
+    }
+  }
+  
+  List<WorkflowDagEntry> entries = new ArrayList<WorkflowDagEntry>();
+  
+  public WorkflowDag() {
+    /* Required by JAXB. */
+  }
+  
+  /* Getters. */
+  public List<WorkflowDagEntry> getEntries() {
+    return this.entries;
+  }
+  
+  /* Setters. */
+  public void setEntries(List<WorkflowDagEntry> entries) {
+    this.entries = entries;
+  }
+  
+  public void addEntry(WorkflowDag.WorkflowDagEntry entry) {
+    this.entries.add(entry);
+  }
+  
+  public int size() {
+    Set<String> nodes = new HashSet<String>();
+    for (WorkflowDagEntry entry : entries) {
+      nodes.add(entry.getSource());
+      nodes.addAll(entry.getTargets());
+    }
+    return nodes.size();
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.eventdb.model;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.commons.lang.StringUtils;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class Workflows {
+  List<WorkflowDBEntry> workflows;
+  
+  public static class WorkflowDBEntry {
+    public static enum WorkflowFields {
+      WORKFLOWID,
+      WORKFLOWNAME,
+      USERNAME,
+      STARTTIME,
+      LASTUPDATETIME,
+      NUMJOBSTOTAL,
+      NUMJOBSCOMPLETED,
+      PARENTWORKFLOWID,
+      WORKFLOWCONTEXT;
+      
+      public String getString(ResultSet rs) throws SQLException {
+        return rs.getString(this.toString());
+      }
+      
+      public int getInt(ResultSet rs) throws SQLException {
+        return rs.getInt(this.toString());
+      }
+      
+      public long getLong(ResultSet rs) throws SQLException {
+        return rs.getLong(this.toString());
+      }
+      
+      public static String join() {
+        String[] tmp = new String[WorkflowFields.values().length];
+        for (int i = 0; i < tmp.length; i++)
+          tmp[i] = WorkflowFields.values()[i].toString();
+        return StringUtils.join(tmp, ",");
+      }
+    }
+    
+    @XmlTransient
+    public static String WORKFLOW_FIELDS = WorkflowFields.join();
+    
+    private String workflowId;
+    private String workflowName;
+    private String userName;
+    private long startTime;
+    private long elapsedTime;
+    private int numJobsTotal;
+    private int numJobsCompleted;
+    private String parentWorkflowId;
+    private WorkflowContext workflowContext;
+    
+    public WorkflowDBEntry() {
+      /* Required by JAXB. */
+    }
+    
+    public String getWorkflowId() {
+      return workflowId;
+    }
+    
+    public String getWorkflowName() {
+      return workflowName;
+    }
+    
+    public String getUserName() {
+      return userName;
+    }
+    
+    public long getStartTime() {
+      return startTime;
+    }
+    
+    public long getElapsedTime() {
+      return elapsedTime;
+    }
+    
+    public int getNumJobsTotal() {
+      return numJobsTotal;
+    }
+    
+    public int getNumJobsCompleted() {
+      return numJobsCompleted;
+    }
+    
+    public String getParentWorkflowId() {
+      return parentWorkflowId;
+    }
+    
+    public WorkflowContext getWorkflowContext() {
+      return workflowContext;
+    }
+    
+    public void setWorkflowId(String workflowId) {
+      this.workflowId = workflowId;
+    }
+    
+    public void setWorkflowName(String workflowName) {
+      this.workflowName = workflowName;
+    }
+    
+    public void setUserName(String userName) {
+      this.userName = userName;
+    }
+    
+    public void setStartTime(long startTime) {
+      this.startTime = startTime;
+    }
+    
+    public void setElapsedTime(long elapsedTime) {
+      this.elapsedTime = elapsedTime;
+    }
+    
+    public void setNumJobsTotal(int numJobsTotal) {
+      this.numJobsTotal = numJobsTotal;
+    }
+    
+    public void setNumJobsCompleted(int numJobsCompleted) {
+      this.numJobsCompleted = numJobsCompleted;
+    }
+    
+    public void setParentWorkflowId(String parentWorkflowId) {
+      this.parentWorkflowId = parentWorkflowId;
+    }
+    
+    public void setWorkflowContext(WorkflowContext workflowContext) {
+      this.workflowContext = workflowContext;
+    }
+  }
+  
+  public Workflows() {}
+  
+  public List<WorkflowDBEntry> getWorkflows() {
+    return workflows;
+  }
+  
+  public void setWorkflows(List<WorkflowDBEntry> workflows) {
+    this.workflows = workflows;
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.eventdb.webservice;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.ext.ContextResolver;
+import javax.xml.bind.JAXBContext;
+
+import org.apache.ambari.eventdb.model.Jobs;
+import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.WorkflowDag;
+import org.apache.ambari.eventdb.model.Workflows;
+
+@Provider
+public class JAXBContextResolver implements ContextResolver<JAXBContext> {
+
+  /* NOTE: Remember to add any new Model classes to this list. */
+  private static final Class[] classes = {
+    WorkflowContext.class,
+    WorkflowDag.class,
+    WorkflowDag.WorkflowDagEntry.class,
+    Jobs.class,
+    Jobs.JobDBEntry.class,
+    Workflows.class,
+    Workflows.WorkflowDBEntry.class
+  };
+
+  private static final Set<Class> types = 
+    new HashSet<Class>(Arrays.asList(classes));
+
+  private static final JAXBContext context;
+
+  static {
+    JAXBContext tmpContext;
+
+    try {
+      tmpContext = new JSONJAXBContext(JSONConfiguration.natural().build(), classes);
+    } catch (Exception e) {
+      /* Do Nothing (with the exception). */
+      tmpContext = null;
+    }
+
+    context = tmpContext;
+  }
+
+  @Override
+  public JAXBContext getContext(Class<?> classType) {
+    return (types.contains(classType)) ? context : null;
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.eventdb.webservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.ambari.eventdb.db.PostgresConnector;
+import org.apache.ambari.eventdb.model.Jobs;
+import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
+import org.apache.ambari.eventdb.model.Workflows;
+import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
+
+@Path("/json")
+public class WorkflowJsonService {
+  private static final String PREFIX = "eventdb.";
+  private static final String HOSTNAME = PREFIX + "db.hostname";
+  private static final String DBNAME = PREFIX + "db.name";
+  private static final String USERNAME = PREFIX + "db.user";
+  private static final String PASSWORD = PREFIX + "db.password";
+  
+  private static final List<WorkflowDBEntry> EMPTY_WORKFLOWS = Collections.emptyList();
+  private static final List<JobDBEntry> EMPTY_JOBS = Collections.emptyList();
+  
+  @Context
+  ServletContext servletContext;
+  
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/workflow")
+  public Workflows getWorkflows() {
+    Workflows workflows = new Workflows();
+    try {
+      PostgresConnector conn = new PostgresConnector(servletContext.getInitParameter(HOSTNAME), servletContext.getInitParameter(DBNAME),
+          servletContext.getInitParameter(USERNAME), servletContext.getInitParameter(PASSWORD));
+      workflows.setWorkflows(conn.fetchWorkflows());
+      conn.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+      workflows.setWorkflows(EMPTY_WORKFLOWS);
+    }
+    return workflows;
+  }
+  
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/job")
+  public Jobs getJobs(@QueryParam("workflowId") String workflowId) {
+    Jobs jobs = new Jobs();
+    try {
+      PostgresConnector conn = new PostgresConnector(servletContext.getInitParameter(HOSTNAME), servletContext.getInitParameter(DBNAME),
+          servletContext.getInitParameter(USERNAME), servletContext.getInitParameter(PASSWORD));
+      jobs.setJobs(conn.fetchJobDetails(workflowId));
+      conn.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+      jobs.setJobs(EMPTY_JOBS);
+    }
+    return jobs;
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public interface LogParser {
+  
+  void addEventToParse(LoggingEvent event);
+  
+  Object getParseResult() throws IOException;
+  
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public interface LogStore {
+  
+  void persist(LoggingEvent originalEvent, Object parsedEvent) 
+      throws IOException;
+  
+  void close() throws IOException;
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+import java.sql.Connection;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public interface LogStoreUpdateProvider {
+  
+  void init(Connection connection) throws IOException;
+  
+  void update(LoggingEvent originalEvent, Object parsedEvent) 
+      throws IOException;
+  
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class LoggingThreadRunnable implements Runnable {
+  private static final Log LOG = LogFactory.getLog(LoggingThreadRunnable.class);
+  
+  private final Queue<LoggingEvent> events;
+  private final LogParser parser;
+  private final LogStore store;
+  private final AtomicBoolean done = new AtomicBoolean(false);
+  
+  public LoggingThreadRunnable(
+      Queue<LoggingEvent> events, 
+      LogParser parser, 
+      LogStore provider) {
+    this.events = events;
+    this.store = provider;
+    this.parser = parser;
+  }
+  
+  @Override
+  public void run() {
+    while (!done.get()) {
+      LoggingEvent event = null;
+      while ((event = events.poll()) != null) {
+        Object result = null;
+        try {
+          parser.addEventToParse(event);
+          while ((result = parser.getParseResult()) != null) {
+            try {
+              store.persist(event, result);
+            } catch (IOException e) {
+              LOG.warn("Failed to persist " + result);
+            }
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Failed to parse log-event: " + event);
+        }
+      } 
+    }
+    try {
+      store.close();
+    } catch (IOException ioe) {
+      LOG.info("Failed to close logStore", ioe);
+    }
+  }
+  
+  public void close() throws IOException {
+    done.set(true);
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common.store;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.ambari.log4j.common.LogStore;
+import org.apache.ambari.log4j.common.LogStoreUpdateProvider;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class DatabaseStore implements LogStore {
+
+  final private String database;
+  final private String user;
+  final private String password;
+  
+  final private Connection connection;
+  final private LogStoreUpdateProvider updateProvider;
+  
+  public DatabaseStore(String driver, 
+      String database, String user, String password, 
+      LogStoreUpdateProvider updateProvider) 
+      throws IOException {
+    try {
+      Class.forName(driver);
+    } catch (ClassNotFoundException e) {
+      System.err.println("Can't load driver - " + driver);
+      throw new RuntimeException("Can't load driver - " + driver);
+    }
+    this.database = database;
+    this.user = (user == null) ? "" : user;
+    this.password = (password == null) ? "" : password;
+    try {
+      this.connection = 
+          DriverManager.getConnection(this.database, this.user, this.password);
+    } catch (SQLException sqle) {
+      throw new IOException("Can't connect to database " + this.database, sqle);
+    }
+    this.updateProvider = updateProvider;
+    
+    this.updateProvider.init(this.connection);
+  }
+  
+  @Override
+  public void persist(LoggingEvent originalEvent, Object parsedEvent)
+      throws IOException {
+    updateProvider.update(originalEvent, parsedEvent);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      connection.close();
+    } catch (SQLException sqle) {
+      throw new IOException(
+          "Failed to close connectionto database " + this.database, sqle);
+    }
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.ambari.log4j.common.LogParser;
+import org.apache.ambari.log4j.common.LogStore;
+import org.apache.ambari.log4j.common.LoggingThreadRunnable;
+import org.apache.ambari.log4j.common.store.DatabaseStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.rumen.HistoryEvent;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class JobHistoryAppender extends AppenderSkeleton implements Appender {
+
+  private static final Log LOG = LogFactory.getLog(JobHistoryAppender.class);
+  
+  private final Queue<LoggingEvent> events;
+  private LoggingThreadRunnable logThreadRunnable;
+  private Thread logThread;
+
+  private final LogParser logParser;
+
+  private final LogStore nullStore =
+      new LogStore() {
+        @Override
+        public void persist(LoggingEvent originalEvent, Object parsedEvent) 
+            throws IOException {
+          LOG.info(((HistoryEvent)parsedEvent).toString());
+        }
+
+        @Override
+        public void close() throws IOException {}
+  };
+
+  private String driver;
+  private String database;
+  private String user;
+  private String password;
+  
+  private LogStore logStore;
+  
+  public JobHistoryAppender() {
+    events = new LinkedBlockingQueue<LoggingEvent>();
+    logParser = new MapReduceJobHistoryParser();
+    logStore = nullStore;
+  }
+  
+  /* Getters & Setters for log4j */
+  
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+  
+  public String getDriver() {
+    return driver;
+  }
+
+  public void setDriver(String driver) {
+    this.driver = driver;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public void setUser(String user) {
+    this.user = user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  /* --------------------------- */
+
+  @Override
+  public void activateOptions() {
+    synchronized (this) {
+      //if (true) { 
+      if (database.equals("none")) {
+        logStore = nullStore;
+        LOG.info("database set to 'none'");
+      } else {
+        try {
+          LOG.info("Connecting to database " + database +
+              " as user " + user + " password " + password + 
+              " and driver " + driver); 
+          logStore = 
+              new DatabaseStore(driver, database, user, password, 
+                  new MapReduceJobHistoryUpdater());
+        } catch (IOException ioe) {
+          LOG.info("Failed to connect to db " + database, ioe);
+          System.err.println("Failed to connect to db " + database + 
+              " as user " + user + " password " + password + 
+              " and driver " + driver + " with " + 
+              StringUtils.stringifyException(ioe));
+          throw new RuntimeException(
+              "Failed to create database store for " + database, ioe);
+        } catch (Exception e) {
+          LOG.info("Failed to connect to db " + database, e);
+          System.err.println("Failed to connect to db " + database + 
+              " as user " + user + " password " + password + 
+              " and driver " + driver + " with " + 
+              StringUtils.stringifyException(e));
+          throw new RuntimeException(
+              "Failed to create database store for " + database, e);
+        }
+      }
+      logThreadRunnable = 
+          new LoggingThreadRunnable(events, logParser, logStore);
+      logThread = new Thread(logThreadRunnable);
+      logThread.start();
+
+      super.activateOptions();
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      logThreadRunnable.close();
+    } catch (IOException ioe) {
+      LOG.info("Failed to close logThreadRunnable", ioe);
+    }
+    try {
+      logThread.join(1000);
+    } catch (InterruptedException ie) {
+      LOG.info("logThread interrupted", ie);
+    }
+  }
+
+  @Override
+  public boolean requiresLayout() {
+    return false;
+  }
+
+  @Override
+  protected void append(LoggingEvent event) {
+    events.add(event);
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.ambari.log4j.common.LogParser;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.rumen.Hadoop20JHParser;
+import org.apache.hadoop.tools.rumen.JobHistoryParser;
+import org.apache.hadoop.util.LineReader;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class MapReduceJobHistoryParser implements LogParser {
+  private JobHistoryParser parser;
+  private LogLineReader reader = new LogLineReader("Meta VERSION=\"1\" .");
+  
+  public MapReduceJobHistoryParser() {
+    try {
+      parser = new Hadoop20JHParser(reader);
+    } catch (IOException ioe) {
+      // SHOULD NEVER HAPPEN!
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public void addEventToParse(LoggingEvent event) {
+    reader.addLine(event.getMessage().toString());
+  }
+  
+  @Override
+  public Object getParseResult() throws IOException {
+    return parser.nextEvent();
+  }
+
+  static class LogLineReader extends LineReader {
+
+    private Queue<String> lines = new LinkedBlockingQueue<String>();
+    
+    public LogLineReader(String line) {
+      super(null);
+      addLine(line);
+    }
+
+    private void addLine(String line) {
+      lines.add(line);
+    }
+    
+    public int readLine(Text str) throws IOException {
+      String line = lines.poll();
+      if (line != null) {
+        str.set(line);
+        return line.length();
+      }
+      
+      return 0;
+    }
+    
+    public void close() throws IOException {
+    }
+  }
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,958 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.WorkflowDag;
+import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry;
+import org.apache.ambari.log4j.common.LogStoreUpdateProvider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.HistoryEvent;
+import org.apache.hadoop.tools.rumen.JobFinishedEvent;
+import org.apache.hadoop.tools.rumen.JobInfoChangeEvent;
+import org.apache.hadoop.tools.rumen.JobInitedEvent;
+import org.apache.hadoop.tools.rumen.JobStatusChangedEvent;
+import org.apache.hadoop.tools.rumen.JobSubmittedEvent;
+import org.apache.hadoop.tools.rumen.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.tools.rumen.MapAttemptFinishedEvent;
+import org.apache.hadoop.tools.rumen.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.tools.rumen.TaskAttemptFinishedEvent;
+import org.apache.hadoop.tools.rumen.TaskAttemptStartedEvent;
+import org.apache.hadoop.tools.rumen.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.tools.rumen.TaskFailedEvent;
+import org.apache.hadoop.tools.rumen.TaskFinishedEvent;
+import org.apache.hadoop.tools.rumen.TaskStartedEvent;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.spi.LoggingEvent;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
+  
+  private static final Log LOG = 
+      LogFactory.getLog(MapReduceJobHistoryUpdater.class);
+  
+  private Connection connection;
+  
+  private static final String WORKFLOW_TABLE = "workflow";
+  private static final String JOB_TABLE = "job";
+  private static final String TASK_TABLE = "task";
+  private static final String TASKATTEMPT_TABLE = "taskAttempt";
+  
+  private PreparedStatement workflowPS = null;
+  private PreparedStatement workflowSelectPS = null;
+  private PreparedStatement workflowUpdateTimePS = null;
+  private PreparedStatement workflowUpdateNumCompletedPS = null;
+  
+  private Map<Class<? extends HistoryEvent>, PreparedStatement> entitySqlMap =
+      new HashMap<Class<? extends HistoryEvent>, PreparedStatement>();
+  
+  @Override
+  public void init(Connection connection) throws IOException {
+    this.connection = connection;
+    
+    try {
+      initializePreparedStatements();
+    } catch (SQLException sqle) {
+      throw new IOException(sqle);
+    }
+  }
+  
+  private void initializePreparedStatements() throws SQLException {
+    initializeJobPreparedStatements();
+    initializeTaskPreparedStatements();
+    initializeTaskAttemptPreparedStatements();
+  }
+  
+  private PreparedStatement jobEndUpdate;
+  
+  private void initializeJobPreparedStatements() throws SQLException {
+
+    /** 
+     * Job events
+     */
+
+    // JobSubmittedEvent
+
+    PreparedStatement jobSubmittedPrepStmnt =
+        connection.prepareStatement(
+            "INSERT INTO " + 
+                JOB_TABLE + 
+                " (" +
+                "jobId, " +
+                "jobName, " +
+                "userName, " +
+                "confPath, " +
+                "queue, " +
+                "submitTime, " +
+                "workflowId, " +
+                "workflowEntityName " +
+                ") " +
+                "VALUES" +
+                " (?, ?, ?, ?, ?, ?, ?, ?)"
+            );
+    entitySqlMap.put(JobSubmittedEvent.class, jobSubmittedPrepStmnt);
+    
+    workflowSelectPS =
+        connection.prepareStatement(
+            "SELECT workflowId FROM " + WORKFLOW_TABLE + " where workflowId = ?"
+            );
+
+    workflowPS = 
+        connection.prepareStatement(
+            "INSERT INTO " +
+                WORKFLOW_TABLE +
+                " (" +
+                "workflowId, " +
+                "workflowName, " +
+                "workflowContext, " +
+                "userName, " +
+                "startTime, " +
+                "lastUpdateTime, " +
+                "numJobsTotal, " +
+                "numJobsCompleted" +
+                ") " +
+                "VALUES" +
+                " (?, ?, ?, ?, ?, ?, ?, ?)"
+            );
+    
+    workflowUpdateTimePS =
+        connection.prepareStatement(
+            "UPDATE " +
+                WORKFLOW_TABLE +
+                " SET " +
+                "lastUpdateTime = ? " +
+                "WHERE workflowId = ?"
+            );
+    
+    workflowUpdateNumCompletedPS =
+        connection.prepareStatement(
+            "UPDATE " +
+                WORKFLOW_TABLE +
+                " SET " +
+                "lastUpdateTime = ?, " +
+                "numJobsCompleted = numJobsCompleted + 1 " +
+                "WHERE workflowId = " +
+                "(SELECT workflowId FROM " +
+                JOB_TABLE +
+                " WHERE jobId = ?)"
+            );
+    
+    // JobFinishedEvent
+
+    PreparedStatement jobFinishedPrepStmnt = 
+        connection.prepareStatement(
+            "UPDATE " +
+                JOB_TABLE +
+                " SET " +
+                "finishTime = ?, " +
+                "finishedMaps = ?, " +
+                "finishedReduces= ?, " +
+                "failedMaps = ?, " +
+                "failedReduces = ?, " +
+                "inputBytes = ?, " +
+                "outputBytes = ? " +
+                "WHERE " +
+                "jobId = ?" 
+            );
+    entitySqlMap.put(JobFinishedEvent.class, jobFinishedPrepStmnt);
+
+    // JobInitedEvent
+    
+    PreparedStatement jobInitedPrepStmnt = 
+        connection.prepareStatement(
+            "UPDATE " +
+                JOB_TABLE +
+                " SET " +
+                "launchTime = ?, " +
+                "maps = ?, " +
+                "reduces = ?, " +
+                "status = ? "+
+                "WHERE " +
+                "jobId = ?" 
+            );
+    entitySqlMap.put(JobInitedEvent.class, jobInitedPrepStmnt);
+
+    // JobStatusChangedEvent
+    
+    PreparedStatement jobStatusChangedPrepStmnt = 
+        connection.prepareStatement(
+            "UPDATE " +
+                JOB_TABLE +
+                " SET " +
+                "status = ? "+
+                "WHERE " +
+                "jobId = ?" 
+            );
+    entitySqlMap.put(JobStatusChangedEvent.class, jobStatusChangedPrepStmnt);
+
+    // JobInfoChangedEvent
+    
+    PreparedStatement jobInfoChangedPrepStmnt = 
+        connection.prepareStatement(
+            "UPDATE " +
+                JOB_TABLE +
+                " SET " +
+                "submitTime = ?, " +
+                "launchTime = ? " +
+                "WHERE " +
+                "jobId = ?" 
+            );
+    entitySqlMap.put(JobInfoChangeEvent.class, jobInfoChangedPrepStmnt);
+
+    // JobUnsuccessfulCompletionEvent
+    PreparedStatement jobUnsuccessfulPrepStmnt = 
+        connection.prepareStatement(
+            "UPDATE " +
+                JOB_TABLE +
+                " SET " +
+                "finishTime = ?, " +
+                "finishedMaps = ?, " +
+                "finishedReduces = ?, " +
+                "status = ? " +
+                "WHERE " +
+                "jobId = ?" 
+            );
+    entitySqlMap.put(
+        JobUnsuccessfulCompletionEvent.class, jobUnsuccessfulPrepStmnt);
+
+    // Job update at the end
+    jobEndUpdate =
+        connection.prepareStatement(
+            "UPDATE " +
+              JOB_TABLE +
+              " SET " +
+              " mapsRuntime = (" +
+                "SELECT " +
+                "SUM(" + 
+                      TASKATTEMPT_TABLE + ".finishTime" +  " - " + 
+                      TASKATTEMPT_TABLE + ".startTime" +
+                		  ")" +
+                " FROM " +
+                TASKATTEMPT_TABLE + 
+                " WHERE " +
+                TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
+                " AND " +
+                TASKATTEMPT_TABLE + ".taskType = ?)" +
+              ", " +
+              " reducesRuntime = (" +
+                "SELECT SUM(" + 
+                             TASKATTEMPT_TABLE + ".finishTime" +  " - " + 
+                             TASKATTEMPT_TABLE + ".startTime" +
+                		        ")" +
+              	" FROM " +
+                TASKATTEMPT_TABLE + 
+                " WHERE " +
+                TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
+                " AND " +
+                TASKATTEMPT_TABLE + ".taskType = ?) " +
+              " WHERE " +
+                 "jobId = ?"
+            );
+  }
+  
+  private void initializeTaskPreparedStatements() throws SQLException {
+
+    /** 
+     * Task events
+     */
+
+    // TaskStartedEvent 
+    
+    PreparedStatement taskStartedPrepStmnt =
+        connection.prepareStatement(
+            "INSERT INTO " +
+                TASK_TABLE +
+                " (" +
+                "jobId, " +
+                "taskType, " +
+                "splits, " +
+                "startTime, " +
+                "taskId" +
+                ") " +
+                "VALUES (?, ?, ?, ?, ?)"
+            );
+    entitySqlMap.put(TaskStartedEvent.class, taskStartedPrepStmnt);
+    
+    // TaskFinishedEvent
+    
+    PreparedStatement taskFinishedPrepStmnt =
+        connection.prepareStatement(
+            "UPDATE " +
+                TASK_TABLE +
+                " SET " +
+                "jobId = ?, " +
+                "taskType = ?, " +
+                "status = ?, " +
+                "finishTime = ? " +
+                " WHERE " +
+                "taskId = ?"
+            );
+    entitySqlMap.put(TaskFinishedEvent.class, taskFinishedPrepStmnt);
+
+    // TaskFailedEvent
+
+    PreparedStatement taskFailedPrepStmnt =
+        connection.prepareStatement(
+            "UPDATE " + 
+                TASK_TABLE + 
+                " SET " +
+                "jobId = ?, " +
+                "taskType = ?, " +
+                "status = ?, " +
+                "finishTime = ?, " +
+                "error = ?, " +
+                "failedAttempt = ? " +
+                "WHERE " +
+                "taskId = ?"
+            );
+    entitySqlMap.put(TaskFailedEvent.class, taskFailedPrepStmnt);
+  }
+
+  private void initializeTaskAttemptPreparedStatements() throws SQLException {
+
+    /**
+     * TaskAttempt events
+     */
+
+    // TaskAttemptStartedEvent
+    
+    PreparedStatement taskAttemptStartedPrepStmnt =
+        connection.prepareStatement(
+            "INSERT INTO " +
+                TASKATTEMPT_TABLE +
+                " (" +
+                "jobId, " +
+                "taskId, " +
+                "taskType, " +
+                "startTime, " +
+                "taskTracker, " +
+                "locality, " +
+                "avataar, " +
+                "taskAttemptId" +
+                ") " +
+                "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
+            );
+    entitySqlMap.put(
+        TaskAttemptStartedEvent.class, taskAttemptStartedPrepStmnt);
+
+    // TaskAttemptFinishedEvent
+    
+    PreparedStatement taskAttemptFinishedPrepStmnt =
+        connection.prepareStatement(
+            "UPDATE " +
+                TASKATTEMPT_TABLE +
+                " SET " +
+                "jobId = ?, " +
+                "taskId = ?, " +
+                "taskType = ?, " +
+                "finishTime = ?, " +
+                "status = ?, " +
+                "taskTracker = ? " +
+                " WHERE " +
+                "taskAttemptId = ?"
+            );
+    entitySqlMap.put(
+        TaskAttemptFinishedEvent.class, taskAttemptFinishedPrepStmnt);
+
+    // TaskAttemptUnsuccessfulEvent
+    
+    PreparedStatement taskAttemptUnsuccessfulPrepStmnt =
+        connection.prepareStatement(
+            "UPDATE " +
+                TASKATTEMPT_TABLE +
+                " SET " +
+                "jobId = ?, " +
+                "taskId = ?, " +
+                "taskType = ?, " +
+                "finishTime = ?, " +
+                "status = ?, " +
+                "taskTracker = ?, " +
+                "error = ? " +
+                " WHERE " +
+                "taskAttemptId = ?"
+            );
+    entitySqlMap.put(
+        TaskAttemptUnsuccessfulCompletionEvent.class, 
+        taskAttemptUnsuccessfulPrepStmnt);
+
+    // MapAttemptFinishedEvent
+    
+    PreparedStatement mapAttemptFinishedPrepStmnt =
+        connection.prepareStatement(
+            "UPDATE " +
+                TASKATTEMPT_TABLE +
+                " SET " +
+                "jobId = ?, " +
+                "taskId = ?, " +
+                "taskType = ?, " +
+                "mapFinishTime = ?, " +
+                "finishTime = ?, " +
+                "status = ?, " +
+                "taskTracker = ? " +
+                " WHERE " +
+                "taskAttemptId = ?"
+            );
+    entitySqlMap.put(
+        MapAttemptFinishedEvent.class, mapAttemptFinishedPrepStmnt);
+
+    // ReduceAttemptFinishedEvent
+    
+    PreparedStatement reduceAttemptFinishedPrepStmnt =
+        connection.prepareStatement(
+            "UPDATE " +
+                TASKATTEMPT_TABLE +
+                " SET " +
+                "jobId = ?, " +
+                "taskId = ?, " +
+                "taskType = ?, " +
+                "shuffleFinishTime = ?, " +
+                "sortFinishTime = ?, " +
+                "finishTime = ?, " +
+                "status = ?, " +
+                "taskTracker = ? " +
+                " WHERE " +
+                "taskAttemptId = ?"
+            );
+    entitySqlMap.put(
+        ReduceAttemptFinishedEvent.class, reduceAttemptFinishedPrepStmnt);
+  }
+
+  private void doUpdates(LoggingEvent originalEvent,
+      Object parsedEvent) throws SQLException {
+    Class<?> eventClass = parsedEvent.getClass();
+    
+    PreparedStatement entityPS = entitySqlMap.get(eventClass);
+    if (entityPS == null) {
+      LOG.debug("No prepared statement for " + eventClass);
+      return;
+    }
+  
+    if (eventClass == JobSubmittedEvent.class) {
+      processJobSubmittedEvent(entityPS, workflowSelectPS, workflowPS, 
+          workflowUpdateTimePS, originalEvent, 
+          (JobSubmittedEvent)parsedEvent);
+    } else if (eventClass == JobFinishedEvent.class) {
+      processJobFinishedEvent(entityPS, 
+          originalEvent, (JobFinishedEvent)parsedEvent);
+    } else if (eventClass == JobInitedEvent.class){
+      processJobInitedEvent(entityPS, 
+          originalEvent, (JobInitedEvent)parsedEvent);
+    } else if (eventClass == JobStatusChangedEvent.class) {
+      processJobStatusChangedEvent(entityPS, workflowUpdateNumCompletedPS,
+          originalEvent, (JobStatusChangedEvent)parsedEvent);
+    } else if (eventClass == JobInfoChangeEvent.class) {
+      processJobInfoChangeEvent(entityPS, 
+          originalEvent, (JobInfoChangeEvent)parsedEvent);
+    } else if (eventClass == JobUnsuccessfulCompletionEvent.class) {
+      processJobUnsuccessfulEvent(entityPS, 
+          originalEvent, (JobUnsuccessfulCompletionEvent)parsedEvent);
+    } else if (eventClass == TaskStartedEvent.class) {
+      processTaskStartedEvent(entityPS, 
+          originalEvent, (TaskStartedEvent)parsedEvent);
+    } else if (eventClass == TaskFinishedEvent.class) {
+      processTaskFinishedEvent(entityPS, 
+          originalEvent, (TaskFinishedEvent)parsedEvent);
+    } else if (eventClass == TaskFailedEvent.class) {
+      processTaskFailedEvent(entityPS, 
+          originalEvent, (TaskFailedEvent)parsedEvent);
+    } else if (eventClass == TaskAttemptStartedEvent.class) {
+      processTaskAttemptStartedEvent(entityPS, 
+          originalEvent, (TaskAttemptStartedEvent)parsedEvent);
+    } else if (eventClass == TaskAttemptFinishedEvent.class) {
+      processTaskAttemptFinishedEvent(entityPS, 
+          originalEvent, (TaskAttemptFinishedEvent)parsedEvent);
+    } else if (eventClass == TaskAttemptUnsuccessfulCompletionEvent.class) {
+      processTaskAttemptUnsuccessfulEvent(entityPS, 
+          originalEvent, (TaskAttemptUnsuccessfulCompletionEvent)parsedEvent);
+    } else if (eventClass == MapAttemptFinishedEvent.class) {
+      processMapAttemptFinishedEvent(entityPS, 
+          originalEvent, (MapAttemptFinishedEvent)parsedEvent);
+    } else if (eventClass == ReduceAttemptFinishedEvent.class) {
+      processReduceAttemptFinishedEvent(entityPS, 
+          originalEvent, (ReduceAttemptFinishedEvent)parsedEvent);
+    }
+  }
+  
+  private void updateJobStatsAtFinish(String jobId) {
+    try {
+      jobEndUpdate.setString(1, "MAP");
+      jobEndUpdate.setString(2, "REDUCE");
+      jobEndUpdate.setString(3, jobId);
+      jobEndUpdate.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to update mapsRuntime/reducesRuntime for " + jobId, 
+          sqle);
+    }
+  }
+  
+  private static WorkflowContext generateWorkflowContext(
+      JobSubmittedEvent historyEvent) {
+    WorkflowDag wfDag = new WorkflowDag();
+    WorkflowDagEntry wfDagEntry = new WorkflowDagEntry();
+    wfDagEntry.setSource("X");
+    wfDag.addEntry(wfDagEntry);
+    
+    WorkflowContext wc = new WorkflowContext();
+    wc.setWorkflowId(historyEvent.getJobId().toString().replace("job_", "mr_"));
+    wc.setWorkflowName(historyEvent.getJobName());
+    wc.setWorkflowEntityName("X");
+    wc.setWorkflowDag(wfDag);
+    return wc;
+  }
+  
+  // this is based on the regex in org.apache.hadoop.tools.rumen.ParsedLine
+  // except this assumes the format "key"="value" so that both key and value
+  // are quoted and may contain escaped characters
+  private static final Pattern adjPattern = 
+      Pattern.compile("\"([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)\"" + "=" + 
+          "\"([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)\" ");
+  
+  public static WorkflowContext buildWorkflowContext(JobSubmittedEvent historyEvent) {
+    String workflowId = historyEvent.getWorkflowId()
+        .replace("\\", "");
+    if (workflowId.isEmpty())
+      return generateWorkflowContext(historyEvent);
+    String workflowName = historyEvent.getWorkflowName()
+        .replace("\\", "");
+    String workflowNodeName = historyEvent.getWorkflowNodeName()
+        .replace("\\", "");
+    String workflowAdjacencies = StringUtils.unEscapeString(
+        historyEvent.getWorkflowAdjacencies(),
+        StringUtils.ESCAPE_CHAR, new char[] {'"', '=', '.'});
+    WorkflowContext context = new WorkflowContext();
+    context.setWorkflowId(workflowId);
+    context.setWorkflowName(workflowName);
+    context.setWorkflowEntityName(workflowNodeName);
+    WorkflowDag dag = new WorkflowDag();
+    Matcher matcher = adjPattern.matcher(workflowAdjacencies);
+
+    while(matcher.find()){
+      WorkflowDagEntry dagEntry = new WorkflowDagEntry();
+      dagEntry.setSource(matcher.group(1).replace("\\", ""));
+      String[] values = StringUtils.getStrings(
+          matcher.group(2).replace("\\", ""));
+      if (values != null) {
+        for (String target : values) {
+          dagEntry.addTarget(target);
+        }
+      }
+      dag.addEntry(dagEntry);
+    }
+    
+    context.setWorkflowDag(dag);
+    return context;
+  }
+  
+  private void processJobSubmittedEvent(
+      PreparedStatement jobPS, 
+      PreparedStatement workflowSelectPS, PreparedStatement workflowPS, 
+      PreparedStatement workflowUpdateTimePS, LoggingEvent logEvent, 
+      JobSubmittedEvent historyEvent) {
+
+    try {
+      String jobId = historyEvent.getJobId().toString();
+      jobPS.setString(1, jobId);
+      jobPS.setString(2, historyEvent.getJobName());
+      jobPS.setString(3, historyEvent.getUserName());
+      jobPS.setString(4, historyEvent.getJobConfPath());
+      jobPS.setString(5, historyEvent.getJobQueueName());
+      jobPS.setLong(6, historyEvent.getSubmitTime());
+      
+      WorkflowContext workflowContext = buildWorkflowContext(historyEvent);
+      
+      // Get workflow information
+      boolean insertWorkflow = false;
+      
+      try {
+        workflowSelectPS.setString(1, workflowContext.getWorkflowId());
+        workflowSelectPS.execute();
+        ResultSet rs = workflowSelectPS.getResultSet();
+        insertWorkflow = !rs.next();
+      } catch (SQLException sqle) {
+        LOG.warn("workflow select failed with: ", sqle);
+        insertWorkflow = false;
+      }
+
+      // Insert workflow 
+      if (insertWorkflow) {
+        WorkflowContext sanitizedWC = new WorkflowContext();
+        sanitizedWC.setWorkflowDag(workflowContext.getWorkflowDag());
+        sanitizedWC.setParentWorkflowContext(workflowContext.getParentWorkflowContext());
+
+        String sanitizedWCString = null;
+        try {
+          ObjectMapper om = new ObjectMapper();
+          sanitizedWCString = om.writeValueAsString(sanitizedWC);
+        } catch (IOException e) {
+          e.printStackTrace();
+          sanitizedWCString = "";
+        } 
+
+        workflowPS.setString(1, workflowContext.getWorkflowId());
+        workflowPS.setString(2, workflowContext.getWorkflowName());
+        workflowPS.setString(3, sanitizedWCString);
+        workflowPS.setString(4, historyEvent.getUserName());
+        workflowPS.setLong(5, historyEvent.getSubmitTime());
+        workflowPS.setLong(6, historyEvent.getSubmitTime());
+        workflowPS.setLong(7, workflowContext.getWorkflowDag().size());
+        workflowPS.setLong(8, 0);
+        workflowPS.executeUpdate();
+        LOG.debug("Successfully inserted workflowId = " + 
+            workflowContext.getWorkflowId());
+      } else {
+        workflowUpdateTimePS.setLong(1, historyEvent.getSubmitTime());
+        workflowUpdateTimePS.setString(2, workflowContext.getWorkflowId());
+        workflowUpdateTimePS.executeUpdate();
+        LOG.debug("Successfully updated workflowId = " + 
+            workflowContext.getWorkflowId());
+      }
+
+      // Insert job
+      jobPS.setString(7, workflowContext.getWorkflowId());
+      jobPS.setString(8, workflowContext.getWorkflowEntityName());
+      jobPS.executeUpdate();
+      LOG.debug("Successfully inserted job = " + jobId + 
+          " and workflowId = " + workflowContext.getWorkflowId());
+
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for job " + 
+          historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+    } catch (Exception e) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for job " + 
+          historyEvent.getJobId() + " into " + JOB_TABLE, e);
+    }
+  }
+  
+  private void processJobFinishedEvent(
+      PreparedStatement entityPS,
+      LoggingEvent logEvent, JobFinishedEvent historyEvent) {
+    Counters counters = historyEvent.getMapCounters();
+    long inputBytes = 0;
+    if (counters != null) {
+      for (CounterGroup group : counters) {
+        for (Counter counter : group) {
+          if (counter.getName().equals("HDFS_BYTES_READ"))
+            inputBytes += counter.getValue();
+        }
+      }
+    }
+    if (historyEvent.getFinishedReduces() != 0)
+      counters = historyEvent.getReduceCounters();
+    long outputBytes = 0;
+    if (counters != null) {
+      for (CounterGroup group : counters) {
+        for (Counter counter : group) {
+          if (counter.getName().equals("HDFS_BYTES_WRITTEN"))
+            outputBytes += counter.getValue();
+        }
+      }
+    }
+    try {
+      entityPS.setLong(1, historyEvent.getFinishTime());
+      entityPS.setInt(2, historyEvent.getFinishedMaps());
+      entityPS.setInt(3, historyEvent.getFinishedReduces());
+      entityPS.setInt(4, historyEvent.getFailedMaps());
+      entityPS.setInt(5, historyEvent.getFailedReduces());
+      entityPS.setLong(6, inputBytes);
+      entityPS.setLong(7, outputBytes);
+      entityPS.setString(8, historyEvent.getJobid().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for job " + 
+          historyEvent.getJobid() + " into " + JOB_TABLE, sqle);
+    }
+    
+    updateJobStatsAtFinish(historyEvent.getJobid().toString());
+
+  }
+
+  private void processJobInitedEvent(
+      PreparedStatement entityPS, 
+      LoggingEvent logEvent, JobInitedEvent historyEvent) {
+    try {
+      entityPS.setLong(1, historyEvent.getLaunchTime());
+      entityPS.setInt(2, historyEvent.getTotalMaps());
+      entityPS.setInt(3, historyEvent.getTotalReduces());
+      entityPS.setString(4, historyEvent.getStatus());
+      entityPS.setString(5, historyEvent.getJobId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for job " + 
+          historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+    }
+  }
+
+  private void processJobStatusChangedEvent(
+      PreparedStatement entityPS, 
+      PreparedStatement workflowUpdateNumCompletedPS, 
+      LoggingEvent logEvent, JobStatusChangedEvent historyEvent) {
+    try {
+      entityPS.setString(1, historyEvent.getStatus());
+      entityPS.setString(2, historyEvent.getJobId().toString());
+      entityPS.executeUpdate();
+      if ("SUCCESS".equals(historyEvent.getStatus())) {
+        workflowUpdateNumCompletedPS.setLong(1, System.currentTimeMillis());
+        workflowUpdateNumCompletedPS.setString(2, 
+            historyEvent.getJobId().toString());
+        workflowUpdateNumCompletedPS.executeUpdate();
+      }
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for job " + 
+          historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+    }
+  }
+
+  private void processJobInfoChangeEvent(
+      PreparedStatement entityPS, 
+      LoggingEvent logEvent, JobInfoChangeEvent historyEvent) {
+    try {
+      entityPS.setLong(1, historyEvent.getSubmitTime());
+      entityPS.setLong(2, historyEvent.getLaunchTime());
+      entityPS.setString(3, historyEvent.getJobId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for job " + 
+          historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+    }
+  }
+
+  private void processJobUnsuccessfulEvent(
+      PreparedStatement entityPS, 
+      LoggingEvent logEvent, JobUnsuccessfulCompletionEvent historyEvent) {
+    try {
+      entityPS.setLong(1, historyEvent.getFinishTime());
+      entityPS.setLong(2, historyEvent.getFinishedMaps());
+      entityPS.setLong(3, historyEvent.getFinishedReduces());
+      entityPS.setString(4, historyEvent.getStatus());
+      entityPS.setString(5, historyEvent.getJobId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for job " + 
+          historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+    }
+    
+    updateJobStatsAtFinish(historyEvent.getJobId().toString());
+  }
+
+  private void processTaskStartedEvent(PreparedStatement entityPS,
+      LoggingEvent logEvent, TaskStartedEvent historyEvent) {
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskType().toString());
+      entityPS.setString(3, historyEvent.getSplitLocations());
+      entityPS.setLong(4, historyEvent.getStartTime());
+      entityPS.setString(5, historyEvent.getTaskId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for task " + 
+          historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
+    }
+  }
+
+  private void processTaskFinishedEvent(
+      PreparedStatement entityPS,  
+      LoggingEvent logEvent, TaskFinishedEvent historyEvent) {
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskType().toString());
+      entityPS.setString(3, historyEvent.getTaskStatus());
+      entityPS.setLong(4, historyEvent.getFinishTime());
+      entityPS.setString(5, historyEvent.getTaskId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for task " + 
+          historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
+    }
+  }
+
+  private void processTaskFailedEvent(
+      PreparedStatement entityPS,  
+      LoggingEvent logEvent, TaskFailedEvent historyEvent) {
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskType().toString());
+      entityPS.setString(3, historyEvent.getTaskStatus());
+      entityPS.setLong(4, historyEvent.getFinishTime());
+      entityPS.setString(5, historyEvent.getError());
+      entityPS.setString(6, historyEvent.getFailedAttemptID().toString());
+      entityPS.setString(7, historyEvent.getTaskId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + " for task " + 
+          historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
+    }
+  }
+
+  private void processTaskAttemptStartedEvent(
+      PreparedStatement entityPS,  
+      LoggingEvent logEvent, TaskAttemptStartedEvent historyEvent) {
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskId().toString());
+      entityPS.setString(3, historyEvent.getTaskType().toString());
+      entityPS.setLong(4, historyEvent.getStartTime());
+      entityPS.setString(5, historyEvent.getTrackerName());
+      entityPS.setString(6, historyEvent.getLocality().toString());
+      entityPS.setString(7, historyEvent.getAvataar().toString());
+      entityPS.setString(8, historyEvent.getTaskAttemptId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + 
+          " for taskAttempt " + historyEvent.getTaskAttemptId() + 
+          " into " + TASKATTEMPT_TABLE, sqle);
+    }
+  }
+  
+  private void processTaskAttemptFinishedEvent(
+      PreparedStatement entityPS,  
+      LoggingEvent logEvent, TaskAttemptFinishedEvent historyEvent) {
+    
+    if (historyEvent.getTaskType() == TaskType.MAP || 
+        historyEvent.getTaskType() == TaskType.REDUCE) {
+      LOG.debug("Ignoring TaskAttemptFinishedEvent for " + 
+        historyEvent.getTaskType());
+      return;
+    }
+    
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskId().toString());
+      entityPS.setString(3, historyEvent.getTaskType().toString());
+      entityPS.setLong(4, historyEvent.getFinishTime());
+      entityPS.setString(5, historyEvent.getTaskStatus());
+      entityPS.setString(6, historyEvent.getHostname());
+      entityPS.setString(7, historyEvent.getAttemptId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + 
+          " for taskAttempt " + historyEvent.getAttemptId() + 
+          " into " + TASKATTEMPT_TABLE, sqle);
+    }
+  }
+  
+  private void processTaskAttemptUnsuccessfulEvent(
+      PreparedStatement entityPS,  
+      LoggingEvent logEvent, 
+      TaskAttemptUnsuccessfulCompletionEvent historyEvent) {
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskId().toString());
+      entityPS.setString(3, historyEvent.getTaskType().toString());
+      entityPS.setLong(4, historyEvent.getFinishTime());
+      entityPS.setString(5, historyEvent.getTaskStatus());
+      entityPS.setString(6, historyEvent.getHostname());
+      entityPS.setString(7, historyEvent.getError());
+      entityPS.setString(8, historyEvent.getTaskAttemptId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + 
+          " for taskAttempt " + historyEvent.getTaskAttemptId() + 
+          " into " + TASKATTEMPT_TABLE, sqle);
+    }
+  }
+  
+  private void processMapAttemptFinishedEvent(
+      PreparedStatement entityPS,  
+      LoggingEvent logEvent, MapAttemptFinishedEvent historyEvent) {
+    
+    if (historyEvent.getTaskType() != TaskType.MAP) {
+      LOG.debug("Ignoring MapAttemptFinishedEvent for " + 
+        historyEvent.getTaskType());
+      return;
+    }
+
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskId().toString());
+      entityPS.setString(3, historyEvent.getTaskType().toString());
+      entityPS.setLong(4, historyEvent.getMapFinishTime());
+      entityPS.setLong(5, historyEvent.getFinishTime());
+      entityPS.setString(6, historyEvent.getTaskStatus());
+      entityPS.setString(7, historyEvent.getHostname());
+      entityPS.setString(8, historyEvent.getAttemptId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + 
+          " for taskAttempt " + historyEvent.getAttemptId() + 
+          " into " + TASKATTEMPT_TABLE, sqle);
+    }
+  }
+  
+  
+  private void processReduceAttemptFinishedEvent(
+      PreparedStatement entityPS,  
+      LoggingEvent logEvent, ReduceAttemptFinishedEvent historyEvent) {
+    if (historyEvent.getTaskType() != TaskType.REDUCE) {
+      LOG.debug("Ignoring ReduceAttemptFinishedEvent for " + 
+        historyEvent.getTaskType());
+      return;
+    }
+
+    try {
+      entityPS.setString(1, 
+          historyEvent.getTaskId().getJobID().toString());
+      entityPS.setString(2, historyEvent.getTaskId().toString());
+      entityPS.setString(3, historyEvent.getTaskType().toString());
+      entityPS.setLong(4, historyEvent.getShuffleFinishTime());
+      entityPS.setLong(5, historyEvent.getSortFinishTime());
+      entityPS.setLong(6, historyEvent.getFinishTime());
+      entityPS.setString(7, historyEvent.getTaskStatus());
+      entityPS.setString(8, historyEvent.getHostname());
+      entityPS.setString(9, historyEvent.getAttemptId().toString());
+      entityPS.executeUpdate();
+    } catch (SQLException sqle) {
+      LOG.info("Failed to store " + historyEvent.getEventType() + 
+          " for taskAttempt " + historyEvent.getAttemptId() + 
+          " into " + TASKATTEMPT_TABLE, sqle);
+    }
+  }
+  
+  
+  @Override
+  public void update(LoggingEvent originalEvent, Object parsedEvent) 
+      throws IOException {
+    try {
+      doUpdates(originalEvent, parsedEvent);
+    } catch (SQLException sqle) {
+      throw new IOException(sqle);
+    }
+  }
+
+}

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema Wed Nov  7 08:49:58 2012
@@ -0,0 +1,70 @@
+CREATE TABLE workflow (
+  workflowId TEXT, workflowName TEXT,
+  parentWorkflowId TEXT,  
+  workflowContext TEXT, userName TEXT,
+  startTime BIGINT, lastUpdateTime BIGINT,
+  numJobsTotal INTEGER, numJobsCompleted INTEGER,
+  PRIMARY KEY (workflowId),
+  FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId)
+);
+
+CREATE TABLE job (
+  jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
+  userName TEXT, queue TEXT, acls TEXT, confPath TEXT, 
+  submitTime BIGINT, launchTime BIGINT, finishTime BIGINT, 
+  maps INTEGER, reduces INTEGER, status TEXT, priority TEXT, 
+  finishedMaps INTEGER, finishedReduces INTEGER, 
+  failedMaps INTEGER, failedReduces INTEGER, 
+  mapsRuntime BIGINT, reducesRuntime BIGINT,
+  mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT, 
+  inputBytes BIGINT, outputBytes BIGINT,
+  PRIMARY KEY(jobId),
+  FOREIGN KEY(workflowId) REFERENCES workflow(workflowId)
+);
+
+CREATE TABLE task (
+  taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT, 
+  startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT, 
+  failedAttempt TEXT, 
+  PRIMARY KEY(taskId), 
+  FOREIGN KEY(jobId) REFERENCES job(jobId)
+);
+
+CREATE TABLE taskAttempt (
+  taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT, 
+  startTime BIGINT, finishTime BIGINT, 
+  mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT, 
+  locality TEXT, avataar TEXT, 
+  status TEXT, error TEXT, counters TEXT, 
+  PRIMARY KEY(taskAttemptId), 
+  FOREIGN KEY(jobId) REFERENCES job(jobId), 
+  FOREIGN KEY(taskId) REFERENCES task(taskId)
+); 
+
+CREATE TABLE hdfsEvent (
+  timestamp BIGINT,
+  userName TEXT,
+  clientIP TEXT,
+  operation TEXT,
+  srcPath TEXT,
+  dstPath TEXT,
+  permissions TEXT
+);
+
+CREATE TABLE mapreduceEvent (
+  timestamp BIGINT,
+  userName TEXT,
+  clientIP TEXT,
+  operation TEXT,
+  target TEXT,
+  result TEXT,
+  description TEXT,
+  permissions TEXT
+);
+
+CREATE TABLE clusterEvent (
+  timestamp BIGINT, 
+  service TEXT, status TEXT, 
+  error TEXT, data TEXT , 
+  host TEXT, rack TEXT
+);

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml Wed Nov  7 08:49:58 2012
@@ -0,0 +1,50 @@
+<!DOCTYPE web-app PUBLIC
+ "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
+ "http://java.sun.com/dtd/web-app_2_3.dtd" >
+
+<web-app
+  xmlns="http://java.sun.com/xml/ns/javaee" 
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd" 
+  version="2.5">
+
+  <context-param>
+    <param-name>eventdb.db.hostname</param-name>
+    <param-value>localhost</param-value>
+  </context-param>
+  <context-param>
+    <param-name>eventdb.db.name</param-name>
+    <param-value>ambari</param-value>
+  </context-param>
+  <context-param>
+    <param-name>eventdb.db.user</param-name>
+    <param-value>dbuser</param-value>
+  </context-param>
+  <context-param>
+    <param-name>eventdb.db.password</param-name>
+    <param-value></param-value>
+  </context-param>
+
+  <servlet>
+    <servlet-name>Workflow JSON Servlet</servlet-name>
+    <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+    <init-param>
+      <param-name>com.sun.jersey.config.property.packages</param-name>
+      <param-value>org.apache.ambari.eventdb.webservice</param-value>
+    </init-param>
+    <load-on-startup>1</load-on-startup>
+  </servlet>
+
+  <!-- Add jsp files here
+  <servlet>
+    <servlet-name></servlet-name>
+    <jsp-file></jsp-file>
+  </servlet>
+  -->
+
+  <servlet-mapping>
+    <servlet-name>Workflow JSON Servlet</servlet-name>
+    <url-pattern>/wf/*</url-pattern>
+  </servlet-mapping>
+
+</web-app>

Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java Wed Nov  7 08:49:58 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry;
+import org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.MapReduceJobHistoryUpdater;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.tools.rumen.JobSubmittedEvent;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * 
+ */
+public class TestJobHistoryParsing extends TestCase {
+  static final char LINE_DELIMITER_CHAR = '.';
+  static final char[] charsToEscape = new char[] {'"', '=', LINE_DELIMITER_CHAR};
+  private static final char DELIMITER = ' ';
+  private static final String ID = "WORKFLOW_ID";
+  private static final String NAME = "WORKFLOW_NAME";
+  private static final String NODE = "WORKFLOW_NODE_NAME";
+  private static final String ADJ = "WORKFLOW_ADJACENCIES";
+  private static final String ID_PROP = "mapreduce.workflow.id";
+  private static final String NAME_PROP = "mapreduce.workflow.name";
+  private static final String NODE_PROP = "mapreduce.workflow.node.name";
+  private static final String ADJ_PROP = "mapreduce.workflow.adjacency";
+  
+  public void test1() {
+    Map<String,String[]> adj = new HashMap<String,String[]>();
+    adj.put("10", new String[] {"20", "30"});
+    adj.put("20", new String[] {"30"});
+    adj.put("30", new String[] {});
+    test("id_0-1", "something.name", "10", adj);
+  }
+  
+  public void test2() {
+    Map<String,String[]> adj = new HashMap<String,String[]>();
+    adj.put("1=0", new String[] {"2 0", "3\"0."});
+    adj.put("2 0", new String[] {"3\"0."});
+    adj.put("3\"0.", new String[] {});
+    test("id_= 0-1", "something.name", "1=0", adj);
+  }
+  
+  public void test(String workflowId, String workflowName, String workflowNodeName, Map<String,String[]> adjacencies) {
+    Configuration conf = new Configuration();
+    setProperties(conf, workflowId, workflowName, workflowNodeName, adjacencies);
+    String log = log("JOB", new String[] {ID, NAME, NODE, ADJ},
+        new String[] {conf.get(ID_PROP), conf.get(NAME_PROP), conf.get(NODE_PROP), JobHistory.JobInfo.getWorkflowAdjacencies(conf)});
+    ParsedLine line = new ParsedLine(log);
+    JobSubmittedEvent event = new JobSubmittedEvent(null, "", "", 0l, "", null, "", line.get(ID), line.get(NAME), line.get(NODE), line.get(ADJ));
+    WorkflowContext context = MapReduceJobHistoryUpdater.buildWorkflowContext(event);
+    assertEquals("Didn't recover workflowId", workflowId, context.getWorkflowId());
+    assertEquals("Didn't recover workflowName", workflowName, context.getWorkflowName());
+    assertEquals("Didn't recover workflowNodeName", workflowNodeName, context.getWorkflowEntityName());
+    assertEquals("Got incorrect number of adjacencies", adjacencies.size(), context.getWorkflowDag().getEntries().size());
+    for (WorkflowDagEntry entry : context.getWorkflowDag().getEntries()) {
+      String[] sTargets = adjacencies.get(entry.getSource());
+      assertNotNull("No original targets for " + entry.getSource(), sTargets);
+      List<String> dTargets = entry.getTargets();
+      assertEquals("Got incorrect number of targets for " + entry.getSource(), sTargets.length, dTargets.size());
+      for (int i = 0; i < sTargets.length; i++) {
+        assertEquals("Got incorrect target for " + entry.getSource(), sTargets[i], dTargets.get(i));
+      }
+    }
+  }
+  
+  private static void setProperties(Configuration conf, String workflowId, String workflowName, String workflowNodeName, Map<String,String[]> adj) {
+    conf.set(ID_PROP, workflowId);
+    conf.set(NAME_PROP, workflowName);
+    conf.set(NODE_PROP, workflowNodeName);
+    for (Entry<String,String[]> entry : adj.entrySet()) {
+      conf.setStrings(ADJ_PROP + "." + entry.getKey(), entry.getValue());
+    }
+  }
+  
+  private static String log(String recordType, String[] keys, String[] values) {
+    int length = recordType.length() + keys.length * 4 + 2;
+    for (int i = 0; i < keys.length; i++) {
+      values[i] = StringUtils.escapeString(values[i], StringUtils.ESCAPE_CHAR, charsToEscape);
+      length += values[i].length() + keys[i].toString().length();
+    }
+    
+    // We have the length of the buffer, now construct it.
+    StringBuilder builder = new StringBuilder(length);
+    builder.append(recordType);
+    builder.append(DELIMITER);
+    for (int i = 0; i < keys.length; i++) {
+      builder.append(keys[i]);
+      builder.append("=\"");
+      builder.append(values[i]);
+      builder.append("\"");
+      builder.append(DELIMITER);
+    }
+    builder.append(LINE_DELIMITER_CHAR);
+    
+    return builder.toString();
+  }
+  
+  private static class ParsedLine {
+    static final String KEY = "(\\w+)";
+    static final String VALUE = "([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)";
+    static final Pattern keyValPair = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
+    Map<String,String> props = new HashMap<String,String>();
+    private String type;
+    
+    ParsedLine(String fullLine) {
+      int firstSpace = fullLine.indexOf(" ");
+      
+      if (firstSpace < 0) {
+        firstSpace = fullLine.length();
+      }
+      
+      if (firstSpace == 0) {
+        return; // This is a junk line of some sort
+      }
+      
+      type = fullLine.substring(0, firstSpace);
+      
+      String propValPairs = fullLine.substring(firstSpace + 1);
+      
+      Matcher matcher = keyValPair.matcher(propValPairs);
+      
+      while (matcher.find()) {
+        String key = matcher.group(1);
+        String value = matcher.group(2);
+        props.put(key, value);
+      }
+    }
+    
+    protected String getType() {
+      return type;
+    }
+    
+    protected String get(String key) {
+      return props.get(key);
+    }
+  }
+}