You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ma...@apache.org on 2012/11/07 09:50:03 UTC
svn commit: r1406498 [8/8] - in /incubator/ambari/branches/AMBARI-666: ./
ambari-agent/src/main/puppet/modules/hdp/manifests/
ambari-agent/src/main/puppet/modules/hdp/manifests/lib/
ambari-agent/src/main/python/ambari_agent/ ambari-server/src/main/java...
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowContext.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.eventdb.model;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class WorkflowContext {
+
+ private String workflowId;
+ private String workflowName;
+ private String workflowEntityName;
+
+ private WorkflowDag workflowDag;
+
+ private WorkflowContext parentWorkflowContext;
+
+ public WorkflowContext() {
+ /* Required by JAXB. */
+ }
+
+ /* Getters. */
+ public String getWorkflowId() {
+ return this.workflowId;
+ }
+
+ public String getWorkflowName() {
+ return this.workflowName;
+ }
+
+ public String getWorkflowEntityName() {
+ return this.workflowEntityName;
+ }
+
+ public WorkflowDag getWorkflowDag() {
+ return this.workflowDag;
+ }
+
+ public WorkflowContext getParentWorkflowContext() {
+ return this.parentWorkflowContext;
+ }
+
+ /* Setters. */
+ public void setWorkflowId(String wfId) {
+ this.workflowId = wfId;
+ }
+
+ public void setWorkflowName(String wfName) {
+ this.workflowName = wfName;
+ }
+
+ public void setWorkflowEntityName(String wfEntityName) {
+ this.workflowEntityName = wfEntityName;
+ }
+
+ public void setWorkflowDag(WorkflowDag wfDag) {
+ this.workflowDag = wfDag;
+ }
+
+ public void setParentWorkflowContext(WorkflowContext pWfContext) {
+ this.parentWorkflowContext = pWfContext;
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/WorkflowDag.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.eventdb.model;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class WorkflowDag {
+
+ public static class WorkflowDagEntry {
+
+ private String source;
+ private List<String> targets = new ArrayList<String>();
+
+ public WorkflowDagEntry() {
+ /* Required by JAXB. */
+ }
+
+ /* Getters. */
+ public String getSource() {
+ return this.source;
+ }
+
+ public List<String> getTargets() {
+ return this.targets;
+ }
+
+ /* Setters. */
+ public void setSource(String source) {
+ this.source = source;
+ }
+
+ public void setTargets(List<String> targets) {
+ this.targets = targets;
+ }
+
+ public void addTarget(String target) {
+ this.targets.add(target);
+ }
+ }
+
+ List<WorkflowDagEntry> entries = new ArrayList<WorkflowDagEntry>();
+
+ public WorkflowDag() {
+ /* Required by JAXB. */
+ }
+
+ /* Getters. */
+ public List<WorkflowDagEntry> getEntries() {
+ return this.entries;
+ }
+
+ /* Setters. */
+ public void setEntries(List<WorkflowDagEntry> entries) {
+ this.entries = entries;
+ }
+
+ public void addEntry(WorkflowDag.WorkflowDagEntry entry) {
+ this.entries.add(entry);
+ }
+
+ public int size() {
+ Set<String> nodes = new HashSet<String>();
+ for (WorkflowDagEntry entry : entries) {
+ nodes.add(entry.getSource());
+ nodes.addAll(entry.getTargets());
+ }
+ return nodes.size();
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/model/Workflows.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.eventdb.model;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.commons.lang.StringUtils;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class Workflows {
+ List<WorkflowDBEntry> workflows;
+
+ public static class WorkflowDBEntry {
+ public static enum WorkflowFields {
+ WORKFLOWID,
+ WORKFLOWNAME,
+ USERNAME,
+ STARTTIME,
+ LASTUPDATETIME,
+ NUMJOBSTOTAL,
+ NUMJOBSCOMPLETED,
+ PARENTWORKFLOWID,
+ WORKFLOWCONTEXT;
+
+ public String getString(ResultSet rs) throws SQLException {
+ return rs.getString(this.toString());
+ }
+
+ public int getInt(ResultSet rs) throws SQLException {
+ return rs.getInt(this.toString());
+ }
+
+ public long getLong(ResultSet rs) throws SQLException {
+ return rs.getLong(this.toString());
+ }
+
+ public static String join() {
+ String[] tmp = new String[WorkflowFields.values().length];
+ for (int i = 0; i < tmp.length; i++)
+ tmp[i] = WorkflowFields.values()[i].toString();
+ return StringUtils.join(tmp, ",");
+ }
+ }
+
+ @XmlTransient
+ public static String WORKFLOW_FIELDS = WorkflowFields.join();
+
+ private String workflowId;
+ private String workflowName;
+ private String userName;
+ private long startTime;
+ private long elapsedTime;
+ private int numJobsTotal;
+ private int numJobsCompleted;
+ private String parentWorkflowId;
+ private WorkflowContext workflowContext;
+
+ public WorkflowDBEntry() {
+ /* Required by JAXB. */
+ }
+
+ public String getWorkflowId() {
+ return workflowId;
+ }
+
+ public String getWorkflowName() {
+ return workflowName;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ public int getNumJobsTotal() {
+ return numJobsTotal;
+ }
+
+ public int getNumJobsCompleted() {
+ return numJobsCompleted;
+ }
+
+ public String getParentWorkflowId() {
+ return parentWorkflowId;
+ }
+
+ public WorkflowContext getWorkflowContext() {
+ return workflowContext;
+ }
+
+ public void setWorkflowId(String workflowId) {
+ this.workflowId = workflowId;
+ }
+
+ public void setWorkflowName(String workflowName) {
+ this.workflowName = workflowName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public void setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ }
+
+ public void setNumJobsTotal(int numJobsTotal) {
+ this.numJobsTotal = numJobsTotal;
+ }
+
+ public void setNumJobsCompleted(int numJobsCompleted) {
+ this.numJobsCompleted = numJobsCompleted;
+ }
+
+ public void setParentWorkflowId(String parentWorkflowId) {
+ this.parentWorkflowId = parentWorkflowId;
+ }
+
+ public void setWorkflowContext(WorkflowContext workflowContext) {
+ this.workflowContext = workflowContext;
+ }
+ }
+
+ public Workflows() {}
+
+ public List<WorkflowDBEntry> getWorkflows() {
+ return workflows;
+ }
+
+ public void setWorkflows(List<WorkflowDBEntry> workflows) {
+ this.workflows = workflows;
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/JAXBContextResolver.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.eventdb.webservice;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.ws.rs.ext.Provider;
+import javax.ws.rs.ext.ContextResolver;
+import javax.xml.bind.JAXBContext;
+
+import org.apache.ambari.eventdb.model.Jobs;
+import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.WorkflowDag;
+import org.apache.ambari.eventdb.model.Workflows;
+
+@Provider
+public class JAXBContextResolver implements ContextResolver<JAXBContext> {
+
+ /* NOTE: Remember to add any new Model classes to this list. */
+ private static final Class[] classes = {
+ WorkflowContext.class,
+ WorkflowDag.class,
+ WorkflowDag.WorkflowDagEntry.class,
+ Jobs.class,
+ Jobs.JobDBEntry.class,
+ Workflows.class,
+ Workflows.WorkflowDBEntry.class
+ };
+
+ private static final Set<Class> types =
+ new HashSet<Class>(Arrays.asList(classes));
+
+ private static final JAXBContext context;
+
+ static {
+ JAXBContext tmpContext;
+
+ try {
+ tmpContext = new JSONJAXBContext(JSONConfiguration.natural().build(), classes);
+ } catch (Exception e) {
+ /* Do Nothing (with the exception). */
+ tmpContext = null;
+ }
+
+ context = tmpContext;
+ }
+
+ @Override
+ public JAXBContext getContext(Class<?> classType) {
+ return (types.contains(classType)) ? context : null;
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/eventdb/webservice/WorkflowJsonService.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.eventdb.webservice;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+
+import org.apache.ambari.eventdb.db.PostgresConnector;
+import org.apache.ambari.eventdb.model.Jobs;
+import org.apache.ambari.eventdb.model.Jobs.JobDBEntry;
+import org.apache.ambari.eventdb.model.Workflows;
+import org.apache.ambari.eventdb.model.Workflows.WorkflowDBEntry;
+
+@Path("/json")
+public class WorkflowJsonService {
+ private static final String PREFIX = "eventdb.";
+ private static final String HOSTNAME = PREFIX + "db.hostname";
+ private static final String DBNAME = PREFIX + "db.name";
+ private static final String USERNAME = PREFIX + "db.user";
+ private static final String PASSWORD = PREFIX + "db.password";
+
+ private static final List<WorkflowDBEntry> EMPTY_WORKFLOWS = Collections.emptyList();
+ private static final List<JobDBEntry> EMPTY_JOBS = Collections.emptyList();
+
+ @Context
+ ServletContext servletContext;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/workflow")
+ public Workflows getWorkflows() {
+ Workflows workflows = new Workflows();
+ try {
+ PostgresConnector conn = new PostgresConnector(servletContext.getInitParameter(HOSTNAME), servletContext.getInitParameter(DBNAME),
+ servletContext.getInitParameter(USERNAME), servletContext.getInitParameter(PASSWORD));
+ workflows.setWorkflows(conn.fetchWorkflows());
+ conn.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ workflows.setWorkflows(EMPTY_WORKFLOWS);
+ }
+ return workflows;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/job")
+ public Jobs getJobs(@QueryParam("workflowId") String workflowId) {
+ Jobs jobs = new Jobs();
+ try {
+ PostgresConnector conn = new PostgresConnector(servletContext.getInitParameter(HOSTNAME), servletContext.getInitParameter(DBNAME),
+ servletContext.getInitParameter(USERNAME), servletContext.getInitParameter(PASSWORD));
+ jobs.setJobs(conn.fetchJobDetails(workflowId));
+ conn.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ jobs.setJobs(EMPTY_JOBS);
+ }
+ return jobs;
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogParser.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public interface LogParser {
+
+ void addEventToParse(LoggingEvent event);
+
+ Object getParseResult() throws IOException;
+
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStore.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public interface LogStore {
+
+ void persist(LoggingEvent originalEvent, Object parsedEvent)
+ throws IOException;
+
+ void close() throws IOException;
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LogStoreUpdateProvider.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+import java.sql.Connection;
+
+import org.apache.log4j.spi.LoggingEvent;
+
+public interface LogStoreUpdateProvider {
+
+ void init(Connection connection) throws IOException;
+
+ void update(LoggingEvent originalEvent, Object parsedEvent)
+ throws IOException;
+
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/LoggingThreadRunnable.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class LoggingThreadRunnable implements Runnable {
+ private static final Log LOG = LogFactory.getLog(LoggingThreadRunnable.class);
+
+ private final Queue<LoggingEvent> events;
+ private final LogParser parser;
+ private final LogStore store;
+ private final AtomicBoolean done = new AtomicBoolean(false);
+
+ public LoggingThreadRunnable(
+ Queue<LoggingEvent> events,
+ LogParser parser,
+ LogStore provider) {
+ this.events = events;
+ this.store = provider;
+ this.parser = parser;
+ }
+
+ @Override
+ public void run() {
+ while (!done.get()) {
+ LoggingEvent event = null;
+ while ((event = events.poll()) != null) {
+ Object result = null;
+ try {
+ parser.addEventToParse(event);
+ while ((result = parser.getParseResult()) != null) {
+ try {
+ store.persist(event, result);
+ } catch (IOException e) {
+ LOG.warn("Failed to persist " + result);
+ }
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to parse log-event: " + event);
+ }
+ }
+ }
+ try {
+ store.close();
+ } catch (IOException ioe) {
+ LOG.info("Failed to close logStore", ioe);
+ }
+ }
+
+ public void close() throws IOException {
+ done.set(true);
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/common/store/DatabaseStore.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.common.store;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+import org.apache.ambari.log4j.common.LogStore;
+import org.apache.ambari.log4j.common.LogStoreUpdateProvider;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class DatabaseStore implements LogStore {
+
+ final private String database;
+ final private String user;
+ final private String password;
+
+ final private Connection connection;
+ final private LogStoreUpdateProvider updateProvider;
+
+ public DatabaseStore(String driver,
+ String database, String user, String password,
+ LogStoreUpdateProvider updateProvider)
+ throws IOException {
+ try {
+ Class.forName(driver);
+ } catch (ClassNotFoundException e) {
+ System.err.println("Can't load driver - " + driver);
+ throw new RuntimeException("Can't load driver - " + driver);
+ }
+ this.database = database;
+ this.user = (user == null) ? "" : user;
+ this.password = (password == null) ? "" : password;
+ try {
+ this.connection =
+ DriverManager.getConnection(this.database, this.user, this.password);
+ } catch (SQLException sqle) {
+ throw new IOException("Can't connect to database " + this.database, sqle);
+ }
+ this.updateProvider = updateProvider;
+
+ this.updateProvider.init(this.connection);
+ }
+
+ @Override
+ public void persist(LoggingEvent originalEvent, Object parsedEvent)
+ throws IOException {
+ updateProvider.update(originalEvent, parsedEvent);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ connection.close();
+ } catch (SQLException sqle) {
+ throw new IOException(
+ "Failed to close connectionto database " + this.database, sqle);
+ }
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/JobHistoryAppender.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.ambari.log4j.common.LogParser;
+import org.apache.ambari.log4j.common.LogStore;
+import org.apache.ambari.log4j.common.LoggingThreadRunnable;
+import org.apache.ambari.log4j.common.store.DatabaseStore;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.rumen.HistoryEvent;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Appender;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class JobHistoryAppender extends AppenderSkeleton implements Appender {
+
+ private static final Log LOG = LogFactory.getLog(JobHistoryAppender.class);
+
+ private final Queue<LoggingEvent> events;
+ private LoggingThreadRunnable logThreadRunnable;
+ private Thread logThread;
+
+ private final LogParser logParser;
+
+ private final LogStore nullStore =
+ new LogStore() {
+ @Override
+ public void persist(LoggingEvent originalEvent, Object parsedEvent)
+ throws IOException {
+ LOG.info(((HistoryEvent)parsedEvent).toString());
+ }
+
+ @Override
+ public void close() throws IOException {}
+ };
+
+ private String driver;
+ private String database;
+ private String user;
+ private String password;
+
+ private LogStore logStore;
+
+ public JobHistoryAppender() {
+ events = new LinkedBlockingQueue<LoggingEvent>();
+ logParser = new MapReduceJobHistoryParser();
+ logStore = nullStore;
+ }
+
+ /* Getters & Setters for log4j */
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public void setDatabase(String database) {
+ this.database = database;
+ }
+
+ public String getDriver() {
+ return driver;
+ }
+
+ public void setDriver(String driver) {
+ this.driver = driver;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ /* --------------------------- */
+
+ @Override
+ public void activateOptions() {
+ synchronized (this) {
+ //if (true) {
+ if (database.equals("none")) {
+ logStore = nullStore;
+ LOG.info("database set to 'none'");
+ } else {
+ try {
+ LOG.info("Connecting to database " + database +
+ " as user " + user + " password " + password +
+ " and driver " + driver);
+ logStore =
+ new DatabaseStore(driver, database, user, password,
+ new MapReduceJobHistoryUpdater());
+ } catch (IOException ioe) {
+ LOG.info("Failed to connect to db " + database, ioe);
+ System.err.println("Failed to connect to db " + database +
+ " as user " + user + " password " + password +
+ " and driver " + driver + " with " +
+ StringUtils.stringifyException(ioe));
+ throw new RuntimeException(
+ "Failed to create database store for " + database, ioe);
+ } catch (Exception e) {
+ LOG.info("Failed to connect to db " + database, e);
+ System.err.println("Failed to connect to db " + database +
+ " as user " + user + " password " + password +
+ " and driver " + driver + " with " +
+ StringUtils.stringifyException(e));
+ throw new RuntimeException(
+ "Failed to create database store for " + database, e);
+ }
+ }
+ logThreadRunnable =
+ new LoggingThreadRunnable(events, logParser, logStore);
+ logThread = new Thread(logThreadRunnable);
+ logThread.start();
+
+ super.activateOptions();
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ logThreadRunnable.close();
+ } catch (IOException ioe) {
+ LOG.info("Failed to close logThreadRunnable", ioe);
+ }
+ try {
+ logThread.join(1000);
+ } catch (InterruptedException ie) {
+ LOG.info("logThread interrupted", ie);
+ }
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return false;
+ }
+
+ @Override
+ protected void append(LoggingEvent event) {
+ events.add(event);
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryParser.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.ambari.log4j.common.LogParser;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.tools.rumen.Hadoop20JHParser;
+import org.apache.hadoop.tools.rumen.JobHistoryParser;
+import org.apache.hadoop.util.LineReader;
+import org.apache.log4j.spi.LoggingEvent;
+
+public class MapReduceJobHistoryParser implements LogParser {
+ private JobHistoryParser parser;
+ private LogLineReader reader = new LogLineReader("Meta VERSION=\"1\" .");
+
+ public MapReduceJobHistoryParser() {
+ try {
+ parser = new Hadoop20JHParser(reader);
+ } catch (IOException ioe) {
+ // SHOULD NEVER HAPPEN!
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ @Override
+ public void addEventToParse(LoggingEvent event) {
+ reader.addLine(event.getMessage().toString());
+ }
+
+ @Override
+ public Object getParseResult() throws IOException {
+ return parser.nextEvent();
+ }
+
+ static class LogLineReader extends LineReader {
+
+ private Queue<String> lines = new LinkedBlockingQueue<String>();
+
+ public LogLineReader(String line) {
+ super(null);
+ addLine(line);
+ }
+
+ private void addLine(String line) {
+ lines.add(line);
+ }
+
+ public int readLine(Text str) throws IOException {
+ String line = lines.poll();
+ if (line != null) {
+ str.set(line);
+ return line.length();
+ }
+
+ return 0;
+ }
+
+ public void close() throws IOException {
+ }
+ }
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/java/org/apache/ambari/log4j/hadoop/mapreduce/jobhistory/MapReduceJobHistoryUpdater.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,958 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.log4j.hadoop.mapreduce.jobhistory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.WorkflowDag;
+import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry;
+import org.apache.ambari.log4j.common.LogStoreUpdateProvider;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.tools.rumen.HistoryEvent;
+import org.apache.hadoop.tools.rumen.JobFinishedEvent;
+import org.apache.hadoop.tools.rumen.JobInfoChangeEvent;
+import org.apache.hadoop.tools.rumen.JobInitedEvent;
+import org.apache.hadoop.tools.rumen.JobStatusChangedEvent;
+import org.apache.hadoop.tools.rumen.JobSubmittedEvent;
+import org.apache.hadoop.tools.rumen.JobUnsuccessfulCompletionEvent;
+import org.apache.hadoop.tools.rumen.MapAttemptFinishedEvent;
+import org.apache.hadoop.tools.rumen.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.tools.rumen.TaskAttemptFinishedEvent;
+import org.apache.hadoop.tools.rumen.TaskAttemptStartedEvent;
+import org.apache.hadoop.tools.rumen.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.tools.rumen.TaskFailedEvent;
+import org.apache.hadoop.tools.rumen.TaskFinishedEvent;
+import org.apache.hadoop.tools.rumen.TaskStartedEvent;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.spi.LoggingEvent;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class MapReduceJobHistoryUpdater implements LogStoreUpdateProvider {
+
+ private static final Log LOG =
+ LogFactory.getLog(MapReduceJobHistoryUpdater.class);
+
+ private Connection connection;
+
+ private static final String WORKFLOW_TABLE = "workflow";
+ private static final String JOB_TABLE = "job";
+ private static final String TASK_TABLE = "task";
+ private static final String TASKATTEMPT_TABLE = "taskAttempt";
+
+ private PreparedStatement workflowPS = null;
+ private PreparedStatement workflowSelectPS = null;
+ private PreparedStatement workflowUpdateTimePS = null;
+ private PreparedStatement workflowUpdateNumCompletedPS = null;
+
+ private Map<Class<? extends HistoryEvent>, PreparedStatement> entitySqlMap =
+ new HashMap<Class<? extends HistoryEvent>, PreparedStatement>();
+
+ @Override
+ public void init(Connection connection) throws IOException {
+ this.connection = connection;
+
+ try {
+ initializePreparedStatements();
+ } catch (SQLException sqle) {
+ throw new IOException(sqle);
+ }
+ }
+
+ private void initializePreparedStatements() throws SQLException {
+ initializeJobPreparedStatements();
+ initializeTaskPreparedStatements();
+ initializeTaskAttemptPreparedStatements();
+ }
+
+ private PreparedStatement jobEndUpdate;
+
+ private void initializeJobPreparedStatements() throws SQLException {
+
+ /**
+ * Job events
+ */
+
+ // JobSubmittedEvent
+
+ PreparedStatement jobSubmittedPrepStmnt =
+ connection.prepareStatement(
+ "INSERT INTO " +
+ JOB_TABLE +
+ " (" +
+ "jobId, " +
+ "jobName, " +
+ "userName, " +
+ "confPath, " +
+ "queue, " +
+ "submitTime, " +
+ "workflowId, " +
+ "workflowEntityName " +
+ ") " +
+ "VALUES" +
+ " (?, ?, ?, ?, ?, ?, ?, ?)"
+ );
+ entitySqlMap.put(JobSubmittedEvent.class, jobSubmittedPrepStmnt);
+
+ workflowSelectPS =
+ connection.prepareStatement(
+ "SELECT workflowId FROM " + WORKFLOW_TABLE + " where workflowId = ?"
+ );
+
+ workflowPS =
+ connection.prepareStatement(
+ "INSERT INTO " +
+ WORKFLOW_TABLE +
+ " (" +
+ "workflowId, " +
+ "workflowName, " +
+ "workflowContext, " +
+ "userName, " +
+ "startTime, " +
+ "lastUpdateTime, " +
+ "numJobsTotal, " +
+ "numJobsCompleted" +
+ ") " +
+ "VALUES" +
+ " (?, ?, ?, ?, ?, ?, ?, ?)"
+ );
+
+ workflowUpdateTimePS =
+ connection.prepareStatement(
+ "UPDATE " +
+ WORKFLOW_TABLE +
+ " SET " +
+ "lastUpdateTime = ? " +
+ "WHERE workflowId = ?"
+ );
+
+ workflowUpdateNumCompletedPS =
+ connection.prepareStatement(
+ "UPDATE " +
+ WORKFLOW_TABLE +
+ " SET " +
+ "lastUpdateTime = ?, " +
+ "numJobsCompleted = numJobsCompleted + 1 " +
+ "WHERE workflowId = " +
+ "(SELECT workflowId FROM " +
+ JOB_TABLE +
+ " WHERE jobId = ?)"
+ );
+
+ // JobFinishedEvent
+
+ PreparedStatement jobFinishedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ JOB_TABLE +
+ " SET " +
+ "finishTime = ?, " +
+ "finishedMaps = ?, " +
+ "finishedReduces= ?, " +
+ "failedMaps = ?, " +
+ "failedReduces = ?, " +
+ "inputBytes = ?, " +
+ "outputBytes = ? " +
+ "WHERE " +
+ "jobId = ?"
+ );
+ entitySqlMap.put(JobFinishedEvent.class, jobFinishedPrepStmnt);
+
+ // JobInitedEvent
+
+ PreparedStatement jobInitedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ JOB_TABLE +
+ " SET " +
+ "launchTime = ?, " +
+ "maps = ?, " +
+ "reduces = ?, " +
+ "status = ? "+
+ "WHERE " +
+ "jobId = ?"
+ );
+ entitySqlMap.put(JobInitedEvent.class, jobInitedPrepStmnt);
+
+ // JobStatusChangedEvent
+
+ PreparedStatement jobStatusChangedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ JOB_TABLE +
+ " SET " +
+ "status = ? "+
+ "WHERE " +
+ "jobId = ?"
+ );
+ entitySqlMap.put(JobStatusChangedEvent.class, jobStatusChangedPrepStmnt);
+
+ // JobInfoChangedEvent
+
+ PreparedStatement jobInfoChangedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ JOB_TABLE +
+ " SET " +
+ "submitTime = ?, " +
+ "launchTime = ? " +
+ "WHERE " +
+ "jobId = ?"
+ );
+ entitySqlMap.put(JobInfoChangeEvent.class, jobInfoChangedPrepStmnt);
+
+ // JobUnsuccessfulCompletionEvent
+ PreparedStatement jobUnsuccessfulPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ JOB_TABLE +
+ " SET " +
+ "finishTime = ?, " +
+ "finishedMaps = ?, " +
+ "finishedReduces = ?, " +
+ "status = ? " +
+ "WHERE " +
+ "jobId = ?"
+ );
+ entitySqlMap.put(
+ JobUnsuccessfulCompletionEvent.class, jobUnsuccessfulPrepStmnt);
+
+ // Job update at the end
+ jobEndUpdate =
+ connection.prepareStatement(
+ "UPDATE " +
+ JOB_TABLE +
+ " SET " +
+ " mapsRuntime = (" +
+ "SELECT " +
+ "SUM(" +
+ TASKATTEMPT_TABLE + ".finishTime" + " - " +
+ TASKATTEMPT_TABLE + ".startTime" +
+ ")" +
+ " FROM " +
+ TASKATTEMPT_TABLE +
+ " WHERE " +
+ TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
+ " AND " +
+ TASKATTEMPT_TABLE + ".taskType = ?)" +
+ ", " +
+ " reducesRuntime = (" +
+ "SELECT SUM(" +
+ TASKATTEMPT_TABLE + ".finishTime" + " - " +
+ TASKATTEMPT_TABLE + ".startTime" +
+ ")" +
+ " FROM " +
+ TASKATTEMPT_TABLE +
+ " WHERE " +
+ TASKATTEMPT_TABLE + ".jobId = " + JOB_TABLE + ".jobId " +
+ " AND " +
+ TASKATTEMPT_TABLE + ".taskType = ?) " +
+ " WHERE " +
+ "jobId = ?"
+ );
+ }
+
+ private void initializeTaskPreparedStatements() throws SQLException {
+
+ /**
+ * Task events
+ */
+
+ // TaskStartedEvent
+
+ PreparedStatement taskStartedPrepStmnt =
+ connection.prepareStatement(
+ "INSERT INTO " +
+ TASK_TABLE +
+ " (" +
+ "jobId, " +
+ "taskType, " +
+ "splits, " +
+ "startTime, " +
+ "taskId" +
+ ") " +
+ "VALUES (?, ?, ?, ?, ?)"
+ );
+ entitySqlMap.put(TaskStartedEvent.class, taskStartedPrepStmnt);
+
+ // TaskFinishedEvent
+
+ PreparedStatement taskFinishedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ TASK_TABLE +
+ " SET " +
+ "jobId = ?, " +
+ "taskType = ?, " +
+ "status = ?, " +
+ "finishTime = ? " +
+ " WHERE " +
+ "taskId = ?"
+ );
+ entitySqlMap.put(TaskFinishedEvent.class, taskFinishedPrepStmnt);
+
+ // TaskFailedEvent
+
+ PreparedStatement taskFailedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ TASK_TABLE +
+ " SET " +
+ "jobId = ?, " +
+ "taskType = ?, " +
+ "status = ?, " +
+ "finishTime = ?, " +
+ "error = ?, " +
+ "failedAttempt = ? " +
+ "WHERE " +
+ "taskId = ?"
+ );
+ entitySqlMap.put(TaskFailedEvent.class, taskFailedPrepStmnt);
+ }
+
+ private void initializeTaskAttemptPreparedStatements() throws SQLException {
+
+ /**
+ * TaskAttempt events
+ */
+
+ // TaskAttemptStartedEvent
+
+ PreparedStatement taskAttemptStartedPrepStmnt =
+ connection.prepareStatement(
+ "INSERT INTO " +
+ TASKATTEMPT_TABLE +
+ " (" +
+ "jobId, " +
+ "taskId, " +
+ "taskType, " +
+ "startTime, " +
+ "taskTracker, " +
+ "locality, " +
+ "avataar, " +
+ "taskAttemptId" +
+ ") " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
+ );
+ entitySqlMap.put(
+ TaskAttemptStartedEvent.class, taskAttemptStartedPrepStmnt);
+
+ // TaskAttemptFinishedEvent
+
+ PreparedStatement taskAttemptFinishedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ TASKATTEMPT_TABLE +
+ " SET " +
+ "jobId = ?, " +
+ "taskId = ?, " +
+ "taskType = ?, " +
+ "finishTime = ?, " +
+ "status = ?, " +
+ "taskTracker = ? " +
+ " WHERE " +
+ "taskAttemptId = ?"
+ );
+ entitySqlMap.put(
+ TaskAttemptFinishedEvent.class, taskAttemptFinishedPrepStmnt);
+
+ // TaskAttemptUnsuccessfulEvent
+
+ PreparedStatement taskAttemptUnsuccessfulPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ TASKATTEMPT_TABLE +
+ " SET " +
+ "jobId = ?, " +
+ "taskId = ?, " +
+ "taskType = ?, " +
+ "finishTime = ?, " +
+ "status = ?, " +
+ "taskTracker = ?, " +
+ "error = ? " +
+ " WHERE " +
+ "taskAttemptId = ?"
+ );
+ entitySqlMap.put(
+ TaskAttemptUnsuccessfulCompletionEvent.class,
+ taskAttemptUnsuccessfulPrepStmnt);
+
+ // MapAttemptFinishedEvent
+
+ PreparedStatement mapAttemptFinishedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ TASKATTEMPT_TABLE +
+ " SET " +
+ "jobId = ?, " +
+ "taskId = ?, " +
+ "taskType = ?, " +
+ "mapFinishTime = ?, " +
+ "finishTime = ?, " +
+ "status = ?, " +
+ "taskTracker = ? " +
+ " WHERE " +
+ "taskAttemptId = ?"
+ );
+ entitySqlMap.put(
+ MapAttemptFinishedEvent.class, mapAttemptFinishedPrepStmnt);
+
+ // ReduceAttemptFinishedEvent
+
+ PreparedStatement reduceAttemptFinishedPrepStmnt =
+ connection.prepareStatement(
+ "UPDATE " +
+ TASKATTEMPT_TABLE +
+ " SET " +
+ "jobId = ?, " +
+ "taskId = ?, " +
+ "taskType = ?, " +
+ "shuffleFinishTime = ?, " +
+ "sortFinishTime = ?, " +
+ "finishTime = ?, " +
+ "status = ?, " +
+ "taskTracker = ? " +
+ " WHERE " +
+ "taskAttemptId = ?"
+ );
+ entitySqlMap.put(
+ ReduceAttemptFinishedEvent.class, reduceAttemptFinishedPrepStmnt);
+ }
+
+ private void doUpdates(LoggingEvent originalEvent,
+ Object parsedEvent) throws SQLException {
+ Class<?> eventClass = parsedEvent.getClass();
+
+ PreparedStatement entityPS = entitySqlMap.get(eventClass);
+ if (entityPS == null) {
+ LOG.debug("No prepared statement for " + eventClass);
+ return;
+ }
+
+ if (eventClass == JobSubmittedEvent.class) {
+ processJobSubmittedEvent(entityPS, workflowSelectPS, workflowPS,
+ workflowUpdateTimePS, originalEvent,
+ (JobSubmittedEvent)parsedEvent);
+ } else if (eventClass == JobFinishedEvent.class) {
+ processJobFinishedEvent(entityPS,
+ originalEvent, (JobFinishedEvent)parsedEvent);
+ } else if (eventClass == JobInitedEvent.class){
+ processJobInitedEvent(entityPS,
+ originalEvent, (JobInitedEvent)parsedEvent);
+ } else if (eventClass == JobStatusChangedEvent.class) {
+ processJobStatusChangedEvent(entityPS, workflowUpdateNumCompletedPS,
+ originalEvent, (JobStatusChangedEvent)parsedEvent);
+ } else if (eventClass == JobInfoChangeEvent.class) {
+ processJobInfoChangeEvent(entityPS,
+ originalEvent, (JobInfoChangeEvent)parsedEvent);
+ } else if (eventClass == JobUnsuccessfulCompletionEvent.class) {
+ processJobUnsuccessfulEvent(entityPS,
+ originalEvent, (JobUnsuccessfulCompletionEvent)parsedEvent);
+ } else if (eventClass == TaskStartedEvent.class) {
+ processTaskStartedEvent(entityPS,
+ originalEvent, (TaskStartedEvent)parsedEvent);
+ } else if (eventClass == TaskFinishedEvent.class) {
+ processTaskFinishedEvent(entityPS,
+ originalEvent, (TaskFinishedEvent)parsedEvent);
+ } else if (eventClass == TaskFailedEvent.class) {
+ processTaskFailedEvent(entityPS,
+ originalEvent, (TaskFailedEvent)parsedEvent);
+ } else if (eventClass == TaskAttemptStartedEvent.class) {
+ processTaskAttemptStartedEvent(entityPS,
+ originalEvent, (TaskAttemptStartedEvent)parsedEvent);
+ } else if (eventClass == TaskAttemptFinishedEvent.class) {
+ processTaskAttemptFinishedEvent(entityPS,
+ originalEvent, (TaskAttemptFinishedEvent)parsedEvent);
+ } else if (eventClass == TaskAttemptUnsuccessfulCompletionEvent.class) {
+ processTaskAttemptUnsuccessfulEvent(entityPS,
+ originalEvent, (TaskAttemptUnsuccessfulCompletionEvent)parsedEvent);
+ } else if (eventClass == MapAttemptFinishedEvent.class) {
+ processMapAttemptFinishedEvent(entityPS,
+ originalEvent, (MapAttemptFinishedEvent)parsedEvent);
+ } else if (eventClass == ReduceAttemptFinishedEvent.class) {
+ processReduceAttemptFinishedEvent(entityPS,
+ originalEvent, (ReduceAttemptFinishedEvent)parsedEvent);
+ }
+ }
+
+ private void updateJobStatsAtFinish(String jobId) {
+ try {
+ jobEndUpdate.setString(1, "MAP");
+ jobEndUpdate.setString(2, "REDUCE");
+ jobEndUpdate.setString(3, jobId);
+ jobEndUpdate.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to update mapsRuntime/reducesRuntime for " + jobId,
+ sqle);
+ }
+ }
+
+ private static WorkflowContext generateWorkflowContext(
+ JobSubmittedEvent historyEvent) {
+ WorkflowDag wfDag = new WorkflowDag();
+ WorkflowDagEntry wfDagEntry = new WorkflowDagEntry();
+ wfDagEntry.setSource("X");
+ wfDag.addEntry(wfDagEntry);
+
+ WorkflowContext wc = new WorkflowContext();
+ wc.setWorkflowId(historyEvent.getJobId().toString().replace("job_", "mr_"));
+ wc.setWorkflowName(historyEvent.getJobName());
+ wc.setWorkflowEntityName("X");
+ wc.setWorkflowDag(wfDag);
+ return wc;
+ }
+
+ // this is based on the regex in org.apache.hadoop.tools.rumen.ParsedLine
+ // except this assumes the format "key"="value" so that both key and value
+ // are quoted and may contain escaped characters
+ private static final Pattern adjPattern =
+ Pattern.compile("\"([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)\"" + "=" +
+ "\"([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)\" ");
+
+ public static WorkflowContext buildWorkflowContext(JobSubmittedEvent historyEvent) {
+ String workflowId = historyEvent.getWorkflowId()
+ .replace("\\", "");
+ if (workflowId.isEmpty())
+ return generateWorkflowContext(historyEvent);
+ String workflowName = historyEvent.getWorkflowName()
+ .replace("\\", "");
+ String workflowNodeName = historyEvent.getWorkflowNodeName()
+ .replace("\\", "");
+ String workflowAdjacencies = StringUtils.unEscapeString(
+ historyEvent.getWorkflowAdjacencies(),
+ StringUtils.ESCAPE_CHAR, new char[] {'"', '=', '.'});
+ WorkflowContext context = new WorkflowContext();
+ context.setWorkflowId(workflowId);
+ context.setWorkflowName(workflowName);
+ context.setWorkflowEntityName(workflowNodeName);
+ WorkflowDag dag = new WorkflowDag();
+ Matcher matcher = adjPattern.matcher(workflowAdjacencies);
+
+ while(matcher.find()){
+ WorkflowDagEntry dagEntry = new WorkflowDagEntry();
+ dagEntry.setSource(matcher.group(1).replace("\\", ""));
+ String[] values = StringUtils.getStrings(
+ matcher.group(2).replace("\\", ""));
+ if (values != null) {
+ for (String target : values) {
+ dagEntry.addTarget(target);
+ }
+ }
+ dag.addEntry(dagEntry);
+ }
+
+ context.setWorkflowDag(dag);
+ return context;
+ }
+
+ private void processJobSubmittedEvent(
+ PreparedStatement jobPS,
+ PreparedStatement workflowSelectPS, PreparedStatement workflowPS,
+ PreparedStatement workflowUpdateTimePS, LoggingEvent logEvent,
+ JobSubmittedEvent historyEvent) {
+
+ try {
+ String jobId = historyEvent.getJobId().toString();
+ jobPS.setString(1, jobId);
+ jobPS.setString(2, historyEvent.getJobName());
+ jobPS.setString(3, historyEvent.getUserName());
+ jobPS.setString(4, historyEvent.getJobConfPath());
+ jobPS.setString(5, historyEvent.getJobQueueName());
+ jobPS.setLong(6, historyEvent.getSubmitTime());
+
+ WorkflowContext workflowContext = buildWorkflowContext(historyEvent);
+
+ // Get workflow information
+ boolean insertWorkflow = false;
+
+ try {
+ workflowSelectPS.setString(1, workflowContext.getWorkflowId());
+ workflowSelectPS.execute();
+ ResultSet rs = workflowSelectPS.getResultSet();
+ insertWorkflow = !rs.next();
+ } catch (SQLException sqle) {
+ LOG.warn("workflow select failed with: ", sqle);
+ insertWorkflow = false;
+ }
+
+ // Insert workflow
+ if (insertWorkflow) {
+ WorkflowContext sanitizedWC = new WorkflowContext();
+ sanitizedWC.setWorkflowDag(workflowContext.getWorkflowDag());
+ sanitizedWC.setParentWorkflowContext(workflowContext.getParentWorkflowContext());
+
+ String sanitizedWCString = null;
+ try {
+ ObjectMapper om = new ObjectMapper();
+ sanitizedWCString = om.writeValueAsString(sanitizedWC);
+ } catch (IOException e) {
+ e.printStackTrace();
+ sanitizedWCString = "";
+ }
+
+ workflowPS.setString(1, workflowContext.getWorkflowId());
+ workflowPS.setString(2, workflowContext.getWorkflowName());
+ workflowPS.setString(3, sanitizedWCString);
+ workflowPS.setString(4, historyEvent.getUserName());
+ workflowPS.setLong(5, historyEvent.getSubmitTime());
+ workflowPS.setLong(6, historyEvent.getSubmitTime());
+ workflowPS.setLong(7, workflowContext.getWorkflowDag().size());
+ workflowPS.setLong(8, 0);
+ workflowPS.executeUpdate();
+ LOG.debug("Successfully inserted workflowId = " +
+ workflowContext.getWorkflowId());
+ } else {
+ workflowUpdateTimePS.setLong(1, historyEvent.getSubmitTime());
+ workflowUpdateTimePS.setString(2, workflowContext.getWorkflowId());
+ workflowUpdateTimePS.executeUpdate();
+ LOG.debug("Successfully updated workflowId = " +
+ workflowContext.getWorkflowId());
+ }
+
+ // Insert job
+ jobPS.setString(7, workflowContext.getWorkflowId());
+ jobPS.setString(8, workflowContext.getWorkflowEntityName());
+ jobPS.executeUpdate();
+ LOG.debug("Successfully inserted job = " + jobId +
+ " and workflowId = " + workflowContext.getWorkflowId());
+
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
+ historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+ } catch (Exception e) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
+ historyEvent.getJobId() + " into " + JOB_TABLE, e);
+ }
+ }
+
+ private void processJobFinishedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, JobFinishedEvent historyEvent) {
+ Counters counters = historyEvent.getMapCounters();
+ long inputBytes = 0;
+ if (counters != null) {
+ for (CounterGroup group : counters) {
+ for (Counter counter : group) {
+ if (counter.getName().equals("HDFS_BYTES_READ"))
+ inputBytes += counter.getValue();
+ }
+ }
+ }
+ if (historyEvent.getFinishedReduces() != 0)
+ counters = historyEvent.getReduceCounters();
+ long outputBytes = 0;
+ if (counters != null) {
+ for (CounterGroup group : counters) {
+ for (Counter counter : group) {
+ if (counter.getName().equals("HDFS_BYTES_WRITTEN"))
+ outputBytes += counter.getValue();
+ }
+ }
+ }
+ try {
+ entityPS.setLong(1, historyEvent.getFinishTime());
+ entityPS.setInt(2, historyEvent.getFinishedMaps());
+ entityPS.setInt(3, historyEvent.getFinishedReduces());
+ entityPS.setInt(4, historyEvent.getFailedMaps());
+ entityPS.setInt(5, historyEvent.getFailedReduces());
+ entityPS.setLong(6, inputBytes);
+ entityPS.setLong(7, outputBytes);
+ entityPS.setString(8, historyEvent.getJobid().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
+ historyEvent.getJobid() + " into " + JOB_TABLE, sqle);
+ }
+
+ updateJobStatsAtFinish(historyEvent.getJobid().toString());
+
+ }
+
+ private void processJobInitedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, JobInitedEvent historyEvent) {
+ try {
+ entityPS.setLong(1, historyEvent.getLaunchTime());
+ entityPS.setInt(2, historyEvent.getTotalMaps());
+ entityPS.setInt(3, historyEvent.getTotalReduces());
+ entityPS.setString(4, historyEvent.getStatus());
+ entityPS.setString(5, historyEvent.getJobId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
+ historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+ }
+ }
+
+ private void processJobStatusChangedEvent(
+ PreparedStatement entityPS,
+ PreparedStatement workflowUpdateNumCompletedPS,
+ LoggingEvent logEvent, JobStatusChangedEvent historyEvent) {
+ try {
+ entityPS.setString(1, historyEvent.getStatus());
+ entityPS.setString(2, historyEvent.getJobId().toString());
+ entityPS.executeUpdate();
+ if ("SUCCESS".equals(historyEvent.getStatus())) {
+ workflowUpdateNumCompletedPS.setLong(1, System.currentTimeMillis());
+ workflowUpdateNumCompletedPS.setString(2,
+ historyEvent.getJobId().toString());
+ workflowUpdateNumCompletedPS.executeUpdate();
+ }
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
+ historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+ }
+ }
+
+ private void processJobInfoChangeEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, JobInfoChangeEvent historyEvent) {
+ try {
+ entityPS.setLong(1, historyEvent.getSubmitTime());
+ entityPS.setLong(2, historyEvent.getLaunchTime());
+ entityPS.setString(3, historyEvent.getJobId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
+ historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+ }
+ }
+
+ private void processJobUnsuccessfulEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, JobUnsuccessfulCompletionEvent historyEvent) {
+ try {
+ entityPS.setLong(1, historyEvent.getFinishTime());
+ entityPS.setLong(2, historyEvent.getFinishedMaps());
+ entityPS.setLong(3, historyEvent.getFinishedReduces());
+ entityPS.setString(4, historyEvent.getStatus());
+ entityPS.setString(5, historyEvent.getJobId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for job " +
+ historyEvent.getJobId() + " into " + JOB_TABLE, sqle);
+ }
+
+ updateJobStatsAtFinish(historyEvent.getJobId().toString());
+ }
+
+ private void processTaskStartedEvent(PreparedStatement entityPS,
+ LoggingEvent logEvent, TaskStartedEvent historyEvent) {
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskType().toString());
+ entityPS.setString(3, historyEvent.getSplitLocations());
+ entityPS.setLong(4, historyEvent.getStartTime());
+ entityPS.setString(5, historyEvent.getTaskId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for task " +
+ historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
+ }
+ }
+
+ private void processTaskFinishedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, TaskFinishedEvent historyEvent) {
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskType().toString());
+ entityPS.setString(3, historyEvent.getTaskStatus());
+ entityPS.setLong(4, historyEvent.getFinishTime());
+ entityPS.setString(5, historyEvent.getTaskId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for task " +
+ historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
+ }
+ }
+
+ private void processTaskFailedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, TaskFailedEvent historyEvent) {
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskType().toString());
+ entityPS.setString(3, historyEvent.getTaskStatus());
+ entityPS.setLong(4, historyEvent.getFinishTime());
+ entityPS.setString(5, historyEvent.getError());
+ entityPS.setString(6, historyEvent.getFailedAttemptID().toString());
+ entityPS.setString(7, historyEvent.getTaskId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() + " for task " +
+ historyEvent.getTaskId() + " into " + TASK_TABLE, sqle);
+ }
+ }
+
+ private void processTaskAttemptStartedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, TaskAttemptStartedEvent historyEvent) {
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskId().toString());
+ entityPS.setString(3, historyEvent.getTaskType().toString());
+ entityPS.setLong(4, historyEvent.getStartTime());
+ entityPS.setString(5, historyEvent.getTrackerName());
+ entityPS.setString(6, historyEvent.getLocality().toString());
+ entityPS.setString(7, historyEvent.getAvataar().toString());
+ entityPS.setString(8, historyEvent.getTaskAttemptId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() +
+ " for taskAttempt " + historyEvent.getTaskAttemptId() +
+ " into " + TASKATTEMPT_TABLE, sqle);
+ }
+ }
+
+ private void processTaskAttemptFinishedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, TaskAttemptFinishedEvent historyEvent) {
+
+ if (historyEvent.getTaskType() == TaskType.MAP ||
+ historyEvent.getTaskType() == TaskType.REDUCE) {
+ LOG.debug("Ignoring TaskAttemptFinishedEvent for " +
+ historyEvent.getTaskType());
+ return;
+ }
+
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskId().toString());
+ entityPS.setString(3, historyEvent.getTaskType().toString());
+ entityPS.setLong(4, historyEvent.getFinishTime());
+ entityPS.setString(5, historyEvent.getTaskStatus());
+ entityPS.setString(6, historyEvent.getHostname());
+ entityPS.setString(7, historyEvent.getAttemptId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() +
+ " for taskAttempt " + historyEvent.getAttemptId() +
+ " into " + TASKATTEMPT_TABLE, sqle);
+ }
+ }
+
+ private void processTaskAttemptUnsuccessfulEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent,
+ TaskAttemptUnsuccessfulCompletionEvent historyEvent) {
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskId().toString());
+ entityPS.setString(3, historyEvent.getTaskType().toString());
+ entityPS.setLong(4, historyEvent.getFinishTime());
+ entityPS.setString(5, historyEvent.getTaskStatus());
+ entityPS.setString(6, historyEvent.getHostname());
+ entityPS.setString(7, historyEvent.getError());
+ entityPS.setString(8, historyEvent.getTaskAttemptId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() +
+ " for taskAttempt " + historyEvent.getTaskAttemptId() +
+ " into " + TASKATTEMPT_TABLE, sqle);
+ }
+ }
+
+ private void processMapAttemptFinishedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, MapAttemptFinishedEvent historyEvent) {
+
+ if (historyEvent.getTaskType() != TaskType.MAP) {
+ LOG.debug("Ignoring MapAttemptFinishedEvent for " +
+ historyEvent.getTaskType());
+ return;
+ }
+
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskId().toString());
+ entityPS.setString(3, historyEvent.getTaskType().toString());
+ entityPS.setLong(4, historyEvent.getMapFinishTime());
+ entityPS.setLong(5, historyEvent.getFinishTime());
+ entityPS.setString(6, historyEvent.getTaskStatus());
+ entityPS.setString(7, historyEvent.getHostname());
+ entityPS.setString(8, historyEvent.getAttemptId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() +
+ " for taskAttempt " + historyEvent.getAttemptId() +
+ " into " + TASKATTEMPT_TABLE, sqle);
+ }
+ }
+
+
+ private void processReduceAttemptFinishedEvent(
+ PreparedStatement entityPS,
+ LoggingEvent logEvent, ReduceAttemptFinishedEvent historyEvent) {
+ if (historyEvent.getTaskType() != TaskType.REDUCE) {
+ LOG.debug("Ignoring ReduceAttemptFinishedEvent for " +
+ historyEvent.getTaskType());
+ return;
+ }
+
+ try {
+ entityPS.setString(1,
+ historyEvent.getTaskId().getJobID().toString());
+ entityPS.setString(2, historyEvent.getTaskId().toString());
+ entityPS.setString(3, historyEvent.getTaskType().toString());
+ entityPS.setLong(4, historyEvent.getShuffleFinishTime());
+ entityPS.setLong(5, historyEvent.getSortFinishTime());
+ entityPS.setLong(6, historyEvent.getFinishTime());
+ entityPS.setString(7, historyEvent.getTaskStatus());
+ entityPS.setString(8, historyEvent.getHostname());
+ entityPS.setString(9, historyEvent.getAttemptId().toString());
+ entityPS.executeUpdate();
+ } catch (SQLException sqle) {
+ LOG.info("Failed to store " + historyEvent.getEventType() +
+ " for taskAttempt " + historyEvent.getAttemptId() +
+ " into " + TASKATTEMPT_TABLE, sqle);
+ }
+ }
+
+
+ @Override
+ public void update(LoggingEvent originalEvent, Object parsedEvent)
+ throws IOException {
+ try {
+ doUpdates(originalEvent, parsedEvent);
+ } catch (SQLException sqle) {
+ throw new IOException(sqle);
+ }
+ }
+
+}
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/resources/ambari.schema Wed Nov 7 08:49:58 2012
@@ -0,0 +1,70 @@
+CREATE TABLE workflow (
+ workflowId TEXT, workflowName TEXT,
+ parentWorkflowId TEXT,
+ workflowContext TEXT, userName TEXT,
+ startTime BIGINT, lastUpdateTime BIGINT,
+ numJobsTotal INTEGER, numJobsCompleted INTEGER,
+ PRIMARY KEY (workflowId),
+ FOREIGN KEY (parentWorkflowId) REFERENCES workflow(workflowId)
+);
+
+CREATE TABLE job (
+ jobId TEXT, workflowId TEXT, jobName TEXT, workflowEntityName TEXT,
+ userName TEXT, queue TEXT, acls TEXT, confPath TEXT,
+ submitTime BIGINT, launchTime BIGINT, finishTime BIGINT,
+ maps INTEGER, reduces INTEGER, status TEXT, priority TEXT,
+ finishedMaps INTEGER, finishedReduces INTEGER,
+ failedMaps INTEGER, failedReduces INTEGER,
+ mapsRuntime BIGINT, reducesRuntime BIGINT,
+ mapCounters TEXT, reduceCounters TEXT, jobCounters TEXT,
+ inputBytes BIGINT, outputBytes BIGINT,
+ PRIMARY KEY(jobId),
+ FOREIGN KEY(workflowId) REFERENCES workflow(workflowId)
+);
+
+CREATE TABLE task (
+ taskId TEXT, jobId TEXT, taskType TEXT, splits TEXT,
+ startTime BIGINT, finishTime BIGINT, status TEXT, error TEXT, counters TEXT,
+ failedAttempt TEXT,
+ PRIMARY KEY(taskId),
+ FOREIGN KEY(jobId) REFERENCES job(jobId)
+);
+
+CREATE TABLE taskAttempt (
+ taskAttemptId TEXT, taskId TEXT, jobId TEXT, taskType TEXT, taskTracker TEXT,
+ startTime BIGINT, finishTime BIGINT,
+ mapFinishTime BIGINT, shuffleFinishTime BIGINT, sortFinishTime BIGINT,
+ locality TEXT, avataar TEXT,
+ status TEXT, error TEXT, counters TEXT,
+ PRIMARY KEY(taskAttemptId),
+ FOREIGN KEY(jobId) REFERENCES job(jobId),
+ FOREIGN KEY(taskId) REFERENCES task(taskId)
+);
+
+CREATE TABLE hdfsEvent (
+ timestamp BIGINT,
+ userName TEXT,
+ clientIP TEXT,
+ operation TEXT,
+ srcPath TEXT,
+ dstPath TEXT,
+ permissions TEXT
+);
+
+CREATE TABLE mapreduceEvent (
+ timestamp BIGINT,
+ userName TEXT,
+ clientIP TEXT,
+ operation TEXT,
+ target TEXT,
+ result TEXT,
+ description TEXT,
+ permissions TEXT
+);
+
+CREATE TABLE clusterEvent (
+ timestamp BIGINT,
+ service TEXT, status TEXT,
+ error TEXT, data TEXT ,
+ host TEXT, rack TEXT
+);
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/main/webapp/WEB-INF/web.xml Wed Nov 7 08:49:58 2012
@@ -0,0 +1,50 @@
+<!DOCTYPE web-app PUBLIC
+ "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN"
+ "http://java.sun.com/dtd/web-app_2_3.dtd" >
+
+<web-app
+ xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
+ version="2.5">
+
+ <context-param>
+ <param-name>eventdb.db.hostname</param-name>
+ <param-value>localhost</param-value>
+ </context-param>
+ <context-param>
+ <param-name>eventdb.db.name</param-name>
+ <param-value>ambari</param-value>
+ </context-param>
+ <context-param>
+ <param-name>eventdb.db.user</param-name>
+ <param-value>dbuser</param-value>
+ </context-param>
+ <context-param>
+ <param-name>eventdb.db.password</param-name>
+ <param-value></param-value>
+ </context-param>
+
+ <servlet>
+ <servlet-name>Workflow JSON Servlet</servlet-name>
+ <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+ <init-param>
+ <param-name>com.sun.jersey.config.property.packages</param-name>
+ <param-value>org.apache.ambari.eventdb.webservice</param-value>
+ </init-param>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
+ <!-- Add jsp files here
+ <servlet>
+ <servlet-name></servlet-name>
+ <jsp-file></jsp-file>
+ </servlet>
+ -->
+
+ <servlet-mapping>
+ <servlet-name>Workflow JSON Servlet</servlet-name>
+ <url-pattern>/wf/*</url-pattern>
+ </servlet-mapping>
+
+</web-app>
Added: incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java?rev=1406498&view=auto
==============================================================================
--- incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java (added)
+++ incubator/ambari/branches/AMBARI-666/contrib/ambari-log4j/src/test/java/org/apache/ambari/TestJobHistoryParsing.java Wed Nov 7 08:49:58 2012
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import junit.framework.TestCase;
+
+import org.apache.ambari.eventdb.model.WorkflowContext;
+import org.apache.ambari.eventdb.model.WorkflowDag.WorkflowDagEntry;
+import org.apache.ambari.log4j.hadoop.mapreduce.jobhistory.MapReduceJobHistoryUpdater;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobHistory;
+import org.apache.hadoop.tools.rumen.JobSubmittedEvent;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ *
+ */
+public class TestJobHistoryParsing extends TestCase {
+ static final char LINE_DELIMITER_CHAR = '.';
+ static final char[] charsToEscape = new char[] {'"', '=', LINE_DELIMITER_CHAR};
+ private static final char DELIMITER = ' ';
+ private static final String ID = "WORKFLOW_ID";
+ private static final String NAME = "WORKFLOW_NAME";
+ private static final String NODE = "WORKFLOW_NODE_NAME";
+ private static final String ADJ = "WORKFLOW_ADJACENCIES";
+ private static final String ID_PROP = "mapreduce.workflow.id";
+ private static final String NAME_PROP = "mapreduce.workflow.name";
+ private static final String NODE_PROP = "mapreduce.workflow.node.name";
+ private static final String ADJ_PROP = "mapreduce.workflow.adjacency";
+
+ public void test1() {
+ Map<String,String[]> adj = new HashMap<String,String[]>();
+ adj.put("10", new String[] {"20", "30"});
+ adj.put("20", new String[] {"30"});
+ adj.put("30", new String[] {});
+ test("id_0-1", "something.name", "10", adj);
+ }
+
+ public void test2() {
+ Map<String,String[]> adj = new HashMap<String,String[]>();
+ adj.put("1=0", new String[] {"2 0", "3\"0."});
+ adj.put("2 0", new String[] {"3\"0."});
+ adj.put("3\"0.", new String[] {});
+ test("id_= 0-1", "something.name", "1=0", adj);
+ }
+
+ public void test(String workflowId, String workflowName, String workflowNodeName, Map<String,String[]> adjacencies) {
+ Configuration conf = new Configuration();
+ setProperties(conf, workflowId, workflowName, workflowNodeName, adjacencies);
+ String log = log("JOB", new String[] {ID, NAME, NODE, ADJ},
+ new String[] {conf.get(ID_PROP), conf.get(NAME_PROP), conf.get(NODE_PROP), JobHistory.JobInfo.getWorkflowAdjacencies(conf)});
+ ParsedLine line = new ParsedLine(log);
+ JobSubmittedEvent event = new JobSubmittedEvent(null, "", "", 0l, "", null, "", line.get(ID), line.get(NAME), line.get(NODE), line.get(ADJ));
+ WorkflowContext context = MapReduceJobHistoryUpdater.buildWorkflowContext(event);
+ assertEquals("Didn't recover workflowId", workflowId, context.getWorkflowId());
+ assertEquals("Didn't recover workflowName", workflowName, context.getWorkflowName());
+ assertEquals("Didn't recover workflowNodeName", workflowNodeName, context.getWorkflowEntityName());
+ assertEquals("Got incorrect number of adjacencies", adjacencies.size(), context.getWorkflowDag().getEntries().size());
+ for (WorkflowDagEntry entry : context.getWorkflowDag().getEntries()) {
+ String[] sTargets = adjacencies.get(entry.getSource());
+ assertNotNull("No original targets for " + entry.getSource(), sTargets);
+ List<String> dTargets = entry.getTargets();
+ assertEquals("Got incorrect number of targets for " + entry.getSource(), sTargets.length, dTargets.size());
+ for (int i = 0; i < sTargets.length; i++) {
+ assertEquals("Got incorrect target for " + entry.getSource(), sTargets[i], dTargets.get(i));
+ }
+ }
+ }
+
+ private static void setProperties(Configuration conf, String workflowId, String workflowName, String workflowNodeName, Map<String,String[]> adj) {
+ conf.set(ID_PROP, workflowId);
+ conf.set(NAME_PROP, workflowName);
+ conf.set(NODE_PROP, workflowNodeName);
+ for (Entry<String,String[]> entry : adj.entrySet()) {
+ conf.setStrings(ADJ_PROP + "." + entry.getKey(), entry.getValue());
+ }
+ }
+
+ private static String log(String recordType, String[] keys, String[] values) {
+ int length = recordType.length() + keys.length * 4 + 2;
+ for (int i = 0; i < keys.length; i++) {
+ values[i] = StringUtils.escapeString(values[i], StringUtils.ESCAPE_CHAR, charsToEscape);
+ length += values[i].length() + keys[i].toString().length();
+ }
+
+ // We have the length of the buffer, now construct it.
+ StringBuilder builder = new StringBuilder(length);
+ builder.append(recordType);
+ builder.append(DELIMITER);
+ for (int i = 0; i < keys.length; i++) {
+ builder.append(keys[i]);
+ builder.append("=\"");
+ builder.append(values[i]);
+ builder.append("\"");
+ builder.append(DELIMITER);
+ }
+ builder.append(LINE_DELIMITER_CHAR);
+
+ return builder.toString();
+ }
+
+ private static class ParsedLine {
+ static final String KEY = "(\\w+)";
+ static final String VALUE = "([^\"\\\\]*+(?:\\\\.[^\"\\\\]*+)*+)";
+ static final Pattern keyValPair = Pattern.compile(KEY + "=" + "\"" + VALUE + "\"");
+ Map<String,String> props = new HashMap<String,String>();
+ private String type;
+
+ ParsedLine(String fullLine) {
+ int firstSpace = fullLine.indexOf(" ");
+
+ if (firstSpace < 0) {
+ firstSpace = fullLine.length();
+ }
+
+ if (firstSpace == 0) {
+ return; // This is a junk line of some sort
+ }
+
+ type = fullLine.substring(0, firstSpace);
+
+ String propValPairs = fullLine.substring(firstSpace + 1);
+
+ Matcher matcher = keyValPair.matcher(propValPairs);
+
+ while (matcher.find()) {
+ String key = matcher.group(1);
+ String value = matcher.group(2);
+ props.put(key, value);
+ }
+ }
+
+ protected String getType() {
+ return type;
+ }
+
+ protected String get(String key) {
+ return props.get(key);
+ }
+ }
+}