You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/05/22 00:13:15 UTC
[3/3] git commit: TEZ-1066. Generate events to integrate with YARN
timeline server. (hitesh)
TEZ-1066. Generate events to integrate with YARN timeline server. (hitesh)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/bc657961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/bc657961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/bc657961
Branch: refs/heads/master
Commit: bc65796146fca5d7d6eb49e85e20fda5bade0b57
Parents: 6e07fc7
Author: Hitesh Shah <hi...@apache.org>
Authored: Wed May 21 14:54:08 2014 -0700
Committer: Hitesh Shah <hi...@apache.org>
Committed: Wed May 21 14:54:08 2014 -0700
----------------------------------------------------------------------
BUILDING.txt | 14 +-
pom.xml | 8 +-
.../apache/tez/dag/api/TezConfiguration.java | 19 +-
.../java/org/apache/tez/common/TezUtils.java | 18 +
.../org/apache/tez/dag/app/DAGAppMaster.java | 8 +-
.../apache/tez/dag/app/dag/impl/DAGImpl.java | 9 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 +
.../dag/app/launcher/ContainerLauncherImpl.java | 2 +-
.../apache/tez/dag/history/HistoryEvent.java | 6 +-
.../tez/dag/history/HistoryEventHandler.java | 32 +-
.../apache/tez/dag/history/ats/ATSService.java | 124 ----
.../apache/tez/dag/history/ats/EntityTypes.java | 28 -
.../tez/dag/history/events/AMLaunchedEvent.java | 63 +-
.../tez/dag/history/events/AMStartedEvent.java | 57 +-
.../history/events/ContainerLaunchedEvent.java | 52 +-
.../history/events/ContainerStoppedEvent.java | 60 +-
.../history/events/DAGCommitStartedEvent.java | 8 -
.../dag/history/events/DAGFinishedEvent.java | 58 +-
.../dag/history/events/DAGInitializedEvent.java | 33 +-
.../tez/dag/history/events/DAGStartedEvent.java | 53 +-
.../dag/history/events/DAGSubmittedEvent.java | 81 +--
.../events/TaskAttemptFinishedEvent.java | 51 +-
.../history/events/TaskAttemptStartedEvent.java | 66 +-
.../dag/history/events/TaskFinishedEvent.java | 51 +-
.../dag/history/events/TaskStartedEvent.java | 47 +-
.../events/VertexCommitStartedEvent.java | 8 -
.../VertexDataMovementEventsGeneratedEvent.java | 20 +-
.../dag/history/events/VertexFinishedEvent.java | 49 +-
.../events/VertexGroupCommitFinishedEvent.java | 8 -
.../events/VertexGroupCommitStartedEvent.java | 8 -
.../history/events/VertexInitializedEvent.java | 58 +-
.../events/VertexParallelismUpdatedEvent.java | 22 +-
.../dag/history/events/VertexStartedEvent.java | 46 +-
.../tez/dag/history/logging/EntityTypes.java | 28 +
.../history/logging/HistoryLoggingService.java | 43 ++
.../impl/HistoryEventJsonConversion.java | 633 +++++++++++++++++++
.../impl/SimpleHistoryLoggingService.java | 169 +++++
.../tez/dag/history/utils/ATSConstants.java | 1 +
.../apache/tez/dag/history/utils/DAGUtils.java | 251 ++++++++
.../TestHistoryEventsProtoConversion.java | 17 +-
.../impl/TestHistoryEventJsonConversion.java | 179 ++++++
tez-dist/pom.xml | 13 +
tez-plugins/pom.xml | 50 ++
tez-plugins/tez-yarn-timeline-history/pom.xml | 71 +++
.../logging/ats/ATSHistoryLoggingService.java | 209 ++++++
.../ats/HistoryEventTimelineConversion.java | 462 ++++++++++++++
.../ats/TestATSHistoryLoggingService.java | 108 ++++
.../ats/TestHistoryEventTimelineConversion.java | 179 ++++++
.../java/org/apache/tez/test/TestTezJobs.java | 57 ++
49 files changed, 2736 insertions(+), 903 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/BUILDING.txt
----------------------------------------------------------------------
diff --git a/BUILDING.txt b/BUILDING.txt
index 5245101..4f2f44e 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -20,6 +20,7 @@ Maven main modules:
- tez-mapreduce ...............(Tez mapreduce)
- tez-dag .....................(Tez dag)
- tez-mapreduce-examples ......(Tez mapreduce examples)
+ - tez-plugins .................(Tez plugins)
- tez-tests ...................(Tez tests)
- tez-dist ....................(Tez dist)
@@ -37,7 +38,7 @@ Maven build goals:
* Run clover : mvn test -Pclover [-Dclover.license=${user.home}/clover.license]
* Run Rat : mvn apache-rat:check
* Build javadocs : mvn javadoc:javadoc
- * Build distribution : mvn package[-Dtar][-Dhadoop.version=2.2.0]
+ * Build distribution : mvn package[-Dtar][-Dhadoop.version=2.4.0]
* Visualize state machines : mvn compile -Pvisualize -DskipTests=true
Build options:
@@ -58,11 +59,18 @@ Tests options:
----------------------------------------------------------------------------------
Building against a specific version of hadoop:
-Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher
-For example to build tez against hadoop 3.0.0-SNAPSHOT
+Tez runs on top of Apache Hadoop YARN and requires hadoop version 2.2.0 or higher.
+
+By default, it can be compiled against hadoop versions 2.4.0 and higher by just
+specifying the hadoop.version. For example, to build tez against hadoop 3.0.0-SNAPSHOT
$ mvn package -Dtar -Dhadoop.version=3.0.0-SNAPSHOT
+However, to compile against hadoop versions lower than 2.4.0, the hadoop24 profile needs
+to be disabled
+
+ $ mvn package -Dtar -Dhadoop.version=2.2.0 -P\!hadoop24
+
To skip Tests and java docs
$ mvn package -Dtar -Dhadoop.version=3.0.0-SNAPSHOT -DskipTests -Dmaven.javadoc.skip=true
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d1d8b6d..6b0b591 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<properties>
<maven.test.redirectTestOutputToFile>true</maven.test.redirectTestOutputToFile>
<clover.license>${user.home}/clover.license</clover.license>
- <hadoop.version>2.2.0</hadoop.version>
+ <hadoop.version>2.4.0</hadoop.version>
<jetty.version>7.6.10.v20130312</jetty.version>
<distMgmtSnapshotsId>apache.snapshots.https</distMgmtSnapshotsId>
<distMgmtSnapshotsName>Apache Development Snapshot Repository</distMgmtSnapshotsName>
@@ -140,6 +140,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.tez</groupId>
+ <artifactId>tez-plugins</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
@@ -326,6 +331,7 @@
<module>tez-mapreduce-examples</module>
<module>tez-tests</module>
<module>tez-dag</module>
+ <module>tez-plugins</module>
<module>tez-dist</module>
<module>docs</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index ba735fa..36fdd59 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -365,9 +365,22 @@ public class TezConfiguration extends Configuration {
@Private
public static final String TEZ_PREWARM_DAG_NAME_PREFIX = "TezPreWarmDAG";
- public static final String YARN_ATS_ENABLED =
- TEZ_PREFIX + "yarn.ats.enabled";
- public static final boolean YARN_ATS_ENABLED_DEFAULT = false;
+ public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS =
+ TEZ_PREFIX + "history.logging.service.class";
+
+ public static final String TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT =
+ "org.apache.tez.dag.history.logging.impl.SimpleHistoryLoggingService";
+
+ public static final String TEZ_SIMPLE_HISTORY_LOGGING_DIR =
+ TEZ_PREFIX + "simple.history.logging.dir";
+ public static final String TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS =
+ TEZ_PREFIX + "simple.history.max.errors";
+ public static final int TEZ_SIMPLE_HISTORY_LOGGING_MAX_ERRORS_DEFAULT = 10;
+
+ public static final String YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS =
+ TEZ_PREFIX + "yarn.ats.event.flush.timeout.millis";
+ public static final long YARN_ATS_EVENT_FLUSH_TIMEOUT_MILLIS_DEFAULT =
+ 3000l;
public static final String DAG_RECOVERY_ENABLED =
TEZ_PREFIX + "dag.recovery.enabled";
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
index b900527..3e3f5eb 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtils.java
@@ -28,6 +28,7 @@ import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.DataFormatException;
@@ -41,6 +42,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.log4j.Appender;
import org.apache.log4j.Logger;
import org.apache.tez.dag.api.TezConfiguration;
@@ -54,6 +57,7 @@ import com.google.protobuf.ByteString;
public class TezUtils {
private static final Log LOG = LogFactory.getLog(TezUtils.class);
+ private static final Random RANDOM = new Random();
public static void addUserSpecifiedTezConfiguration(Configuration conf) throws IOException {
FileInputStream confPBBinaryStream = null;
@@ -329,4 +333,18 @@ public class TezUtils {
}
return bytes;
}
+
+ public static String getContainerLogDir() {
+ String logDirsStr = System.getenv(Environment.LOG_DIRS.name());
+ if (logDirsStr == null || logDirsStr.isEmpty()) {
+ return null;
+ }
+ String[] logDirs = StringUtils.split(logDirsStr, ',');
+ if (logDirs.length == 0) {
+ return null;
+ }
+ int logIndex = RANDOM.nextInt(logDirs.length);
+ return logDirs[logIndex];
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index e16064a..aed9aa7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -416,7 +416,7 @@ public class DAGAppMaster extends AbstractService {
super.serviceInit(conf);
AMLaunchedEvent launchedEvent = new AMLaunchedEvent(appAttemptID,
- startTime, appSubmitTime);
+ startTime, appSubmitTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(launchedEvent));
@@ -1553,7 +1553,8 @@ public class DAGAppMaster extends AbstractService {
DefaultMetricsSystem.initialize("DAGAppMaster");
this.appsStartTime = clock.getTime();
- AMStartedEvent startEvent = new AMStartedEvent(appAttemptID, appsStartTime);
+ AMStartedEvent startEvent = new AMStartedEvent(appAttemptID,
+ appsStartTime, appMasterUgi.getShortUserName());
historyEventHandler.handle(
new DAGHistoryEvent(startEvent));
@@ -1891,7 +1892,8 @@ public class DAGAppMaster extends AbstractService {
// Job name is the same as the app name until we support multiple dags
// for an app later
DAGSubmittedEvent submittedEvent = new DAGSubmittedEvent(newDAG.getID(),
- submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources);
+ submitTime, dagPlan, this.appAttemptID, cumulativeAdditionalResources,
+ newDAG.getUserName());
try {
historyEventHandler.handleCriticalEvent(
new DAGHistoryEvent(newDAG.getID(), submittedEvent));
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index c92d51b..71d92b0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -895,21 +895,22 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
void logJobHistoryFinishedEvent() throws IOException {
this.setFinishTime();
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
- finishTime, DAGState.SUCCEEDED, "", getAllCounters());
+ finishTime, DAGState.SUCCEEDED, "", getAllCounters(),
+ this.userName, this.dagName);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
void logJobHistoryInitedEvent() {
DAGInitializedEvent initEvt = new DAGInitializedEvent(this.dagId,
- this.initTime);
+ this.initTime, this.userName, this.dagName);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(dagId, initEvt));
}
void logJobHistoryStartedEvent() {
DAGStartedEvent startEvt = new DAGStartedEvent(this.dagId,
- this.startTime);
+ this.startTime, this.userName, this.dagName);
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(dagId, startEvt));
}
@@ -918,7 +919,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
DAGFinishedEvent finishEvt = new DAGFinishedEvent(dagId, startTime,
clock.getTime(), state,
StringUtils.join(LINE_SEPARATOR, getDiagnostics()),
- getAllCounters());
+ getAllCounters(), this.userName, this.dagName);
this.appContext.getHistoryHandler().handleCriticalEvent(
new DAGHistoryEvent(dagId, finishEvt));
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 9701be4..17257ea 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@@ -56,6 +57,7 @@ import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
index 437d057..8ea7c8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherImpl.java
@@ -174,7 +174,7 @@ public class ContainerLauncherImpl extends AbstractService implements
ContainerLaunchedEvent lEvt = new ContainerLaunchedEvent(
containerID, clock.getTime(), context.getApplicationAttemptId());
context.getHistoryHandler().handle(new DAGHistoryEvent(
- context.getCurrentDAGID(), lEvt));
+ null, lEvt));
this.state = ContainerState.RUNNING;
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
index 3f756c0..1ca0d5f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEvent.java
@@ -22,15 +22,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
public interface HistoryEvent {
public HistoryEventType getEventType();
- public JSONObject convertToATSJSON() throws JSONException;
-
public boolean isRecoveryEvent();
public boolean isHistoryEvent();
@@ -38,4 +33,5 @@ public interface HistoryEvent {
public void toProtoStream(OutputStream outputStream) throws IOException;
public void fromProtoStream(InputStream inputStream) throws IOException;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
index 4eb094f..82e063a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventHandler.java
@@ -18,28 +18,27 @@
package org.apache.tez.dag.history;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.CompositeService;
+import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
-import org.apache.tez.dag.history.ats.ATSService;
+import org.apache.tez.dag.history.logging.HistoryLoggingService;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
public class HistoryEventHandler extends CompositeService {
private static Log LOG = LogFactory.getLog(HistoryEventHandler.class);
private final AppContext context;
- private boolean yarnATSEnabled;
- private ATSService atsService;
private RecoveryService recoveryService;
private boolean recoveryEnabled;
+ private HistoryLoggingService historyLoggingService;
public HistoryEventHandler(AppContext context) {
super(HistoryEventHandler.class.getName());
@@ -49,14 +48,19 @@ public class HistoryEventHandler extends CompositeService {
@Override
public void serviceInit(Configuration conf) throws Exception {
LOG.info("Initializing HistoryEventHandler");
- this.yarnATSEnabled = context.getAMConf().getBoolean(TezConfiguration.YARN_ATS_ENABLED,
- TezConfiguration.YARN_ATS_ENABLED_DEFAULT);
+
this.recoveryEnabled = context.getAMConf().getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED,
TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT);
- if (yarnATSEnabled) {
- atsService = new ATSService();
- addService(atsService);
- }
+
+ String historyServiceClassName = context.getAMConf().get(
+ TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS,
+ TezConfiguration.TEZ_HISTORY_LOGGING_SERVICE_CLASS_DEFAULT);
+
+ historyLoggingService =
+ RuntimeUtils.createClazzInstance(historyServiceClassName);
+ historyLoggingService.setAppContext(context);
+ addService(historyLoggingService);
+
if (recoveryEnabled) {
recoveryService = new RecoveryService(context);
addService(recoveryService);
@@ -97,8 +101,8 @@ public class HistoryEventHandler extends CompositeService {
if (recoveryEnabled && event.getHistoryEvent().isRecoveryEvent()) {
recoveryService.handle(event);
}
- if (yarnATSEnabled && event.getHistoryEvent().isHistoryEvent()) {
- atsService.handle(event);
+ if (event.getHistoryEvent().isHistoryEvent()) {
+ historyLoggingService.handle(event);
}
// TODO at some point we should look at removing this once
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
deleted file mode 100644
index 690e850..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/ATSService.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.ats;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.tez.dag.history.DAGHistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ATSService extends AbstractService {
-
- private static final Log LOG = LogFactory.getLog(ATSService.class);
-
- private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
- new LinkedBlockingQueue<DAGHistoryEvent>();
-
- private final AtomicInteger historyCounter =
- new AtomicInteger(0);
- private String outputFilePrefix;
- private Thread eventHandlingThread;
- private AtomicBoolean stopped = new AtomicBoolean(false);
- private int eventCounter = 0;
- private int eventsProcessed = 0;
- private final Object lock = new Object();
-
- public ATSService() {
- super(ATSService.class.getName());
- }
-
- @Override
- public void serviceInit(Configuration conf) throws Exception {
- LOG.info("Initializing ATSService");
-
- }
-
- @Override
- public void serviceStart() {
- LOG.info("Starting ATSService");
- eventHandlingThread = new Thread(new Runnable() {
- @Override
- public void run() {
- DAGHistoryEvent event;
- while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
-
- // Log the size of the event-queue every so often.
- if (eventCounter != 0 && eventCounter % 1000 == 0) {
- LOG.info("Event queue stats"
- + ", eventsProcessedSinceLastUpdate=" + eventsProcessed
- + ", eventQueueSize=" + eventQueue.size());
- eventCounter = 0;
- eventsProcessed = 0;
- } else {
- ++eventCounter;
- }
-
- try {
- event = eventQueue.take();
- } catch (InterruptedException e) {
- LOG.info("EventQueue take interrupted. Returning");
- return;
- }
-
- synchronized (lock) {
- ++eventsProcessed;
- try {
- handleEvent(event);
- } catch (Exception e) {
- // TODO handle failures - treat as fatal or ignore?
- LOG.warn("Error handling event", e);
- }
- }
- }
- }
- }, "HistoryEventHandlingThread");
- eventHandlingThread.start();
- }
-
- @Override
- public void serviceStop() {
- LOG.info("Stopping ATSService");
- stopped.set(true);
- if (eventHandlingThread != null) {
- eventHandlingThread.interrupt();
- }
- }
-
- public void handle(DAGHistoryEvent event) {
- eventQueue.add(event);
- }
-
- private void handleEvent(DAGHistoryEvent event) {
- HistoryEventType eventType = event.getHistoryEvent().getEventType();
- try {
- // TODO integrate with ATS
- } catch (Exception e) {
- LOG.warn("Could not handle history event, eventType="
- + eventType, e);
- // TODO handle error as a fatal event or ignore/skip?
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java b/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
deleted file mode 100644
index a7f0208..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/ats/EntityTypes.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.history.ats;
-
-public enum EntityTypes {
- TEZ_APPLICATION_ATTEMPT,
- TEZ_CONTAINER_ID,
- TEZ_DAG_ID,
- TEZ_VERTEX_ID,
- TEZ_TASK_ID,
- TEZ_TASK_ATTEMPT_ID,
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
index 54bc658..049d340 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMLaunchedEvent.java
@@ -18,35 +18,32 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.AMLaunchedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class AMLaunchedEvent implements HistoryEvent {
private ApplicationAttemptId applicationAttemptId;
private long launchTime;
private long appSubmitTime;
+ private String user;
public AMLaunchedEvent() {
}
public AMLaunchedEvent(ApplicationAttemptId appAttemptId,
- long launchTime, long appSubmitTime) {
+ long launchTime, long appSubmitTime, String user) {
this.applicationAttemptId = appAttemptId;
this.launchTime = launchTime;
this.appSubmitTime = appSubmitTime;
+ this.user = user;
}
@Override
@@ -55,48 +52,6 @@ public class AMLaunchedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + applicationAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- // Related Entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject appEntity = new JSONObject();
- appEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.getApplicationId().toString());
- appEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ID);
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ATTEMPT_ID);
- relatedEntities.put(appEntity);
- relatedEntities.put(appAttemptEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject initEvent = new JSONObject();
- initEvent.put(ATSConstants.TIMESTAMP, launchTime);
- initEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.AM_LAUNCHED.name());
- events.put(initEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info to tag with Tez AM
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.APP_SUBMIT_TIME, appSubmitTime);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return false;
}
@@ -151,4 +106,8 @@ public class AMLaunchedEvent implements HistoryEvent {
return appSubmitTime;
}
+ public String getUser() {
+ return user;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
index e66141b..aa7b3f2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/AMStartedEvent.java
@@ -18,33 +18,30 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.AMStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class AMStartedEvent implements HistoryEvent {
private ApplicationAttemptId applicationAttemptId;
private long startTime;
+ private String user;
public AMStartedEvent() {
}
public AMStartedEvent(ApplicationAttemptId appAttemptId,
- long startTime) {
+ long startTime, String user) {
this.applicationAttemptId = appAttemptId;
this.startTime = startTime;
+ this.user = user;
}
@Override
@@ -53,43 +50,6 @@ public class AMStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + applicationAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- // Related Entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject appEntity = new JSONObject();
- appEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.getApplicationId().toString());
- appEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ID);
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ATTEMPT_ID);
- relatedEntities.put(appEntity);
- relatedEntities.put(appAttemptEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.AM_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -137,5 +97,8 @@ public class AMStartedEvent implements HistoryEvent {
return startTime;
}
+ public String getUser() {
+ return user;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
index 471ddd1..c37b7f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerLaunchedEvent.java
@@ -18,21 +18,16 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerLaunchedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class ContainerLaunchedEvent implements HistoryEvent {
@@ -57,45 +52,6 @@ public class ContainerLaunchedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + containerId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_CONTAINER_ID.name());
-
- JSONArray relatedEntities = new JSONArray();
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- JSONObject containerEntity = new JSONObject();
- containerEntity.put(ATSConstants.ENTITY, containerId.toString());
- containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
- relatedEntities.put(appAttemptEntity);
- relatedEntities.put(containerEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject launchEvent = new JSONObject();
- launchEvent.put(ATSConstants.TIMESTAMP, launchTime);
- launchEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.CONTAINER_LAUNCHED.name());
- events.put(launchEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // TODO add other container info here? or assume AHS will have this?
- // TODO container logs?
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
index a544354..549720a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/ContainerStoppedEvent.java
@@ -18,23 +18,16 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.recovery.records.RecoveryProtos.ContainerStoppedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class ContainerStoppedEvent implements HistoryEvent {
@@ -62,51 +55,6 @@ public class ContainerStoppedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // structure is identical to ContainerLaunchedEvent
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- "tez_" + containerId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_CONTAINER_ID.name());
-
- JSONArray relatedEntities = new JSONArray();
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
-
- JSONObject containerEntity = new JSONObject();
- containerEntity.put(ATSConstants.ENTITY, containerId.toString());
- containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
- relatedEntities.put(appAttemptEntity);
- relatedEntities.put(containerEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject stopEvent = new JSONObject();
- stopEvent.put(ATSConstants.TIMESTAMP, stopTime);
- stopEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.CONTAINER_STOPPED.name());
- events.put(stopEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // TODO add other container info here? or assume AHS will have this?
- // TODO container logs?
-
- // Other info
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.EXIT_STATUS, exitStatus);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
index 627751a..95e8630 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.utils.ProtoUtils;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
@@ -51,12 +49,6 @@ public class DAGCommitStartedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index 14381b3..1cfc36e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -22,47 +22,50 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGFinishedProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
import com.google.common.primitives.Ints;
import com.google.protobuf.ByteString;
public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
+ private static final Log LOG = LogFactory.getLog(DAGFinishedEvent.class);
+
private TezDAGID dagID;
private long startTime;
private long finishTime;
private DAGState state;
private String diagnostics;
private TezCounters tezCounters;
+ private String user;
+ private String dagName;
public DAGFinishedEvent() {
}
public DAGFinishedEvent(TezDAGID dagId, long startTime,
long finishTime, DAGState state,
- String diagnostics, TezCounters counters) {
+ String diagnostics, TezCounters counters,
+ String user, String dagName) {
this.dagID = dagId;
this.startTime = startTime;
this.finishTime = finishTime;
this.state = state;
this.diagnostics = diagnostics;
this.tezCounters = counters;
+ this.user = user;
+ this.dagName = dagName;
}
@Override
@@ -71,40 +74,6 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- dagID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_DAG_ID.name());
-
- // Related Entities not needed as should have been done in
- // dag submission event
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.DAG_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -215,4 +184,11 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
return tezCounters;
}
+ public String getUser() {
+ return user;
+ }
+
+ public String getDagName() {
+ return dagName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 9b001b6..b3a0165 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -18,29 +18,31 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-// TODO fix class
public class DAGInitializedEvent implements HistoryEvent {
private TezDAGID dagID;
private long initTime;
+ private String user;
+ private String dagName;
public DAGInitializedEvent() {
}
- public DAGInitializedEvent(TezDAGID dagID, long initTime) {
+ public DAGInitializedEvent(TezDAGID dagID, long initTime,
+ String user, String dagName) {
this.dagID = dagID;
this.initTime = initTime;
+ this.user = user;
+ this.dagName = dagName;
}
@Override
@@ -49,12 +51,6 @@ public class DAGInitializedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- throw new UnsupportedOperationException();
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -101,4 +97,13 @@ public class DAGInitializedEvent implements HistoryEvent {
public TezDAGID getDagID() {
return dagID;
}
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getDagName() {
+ return dagName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index a1bcdf2..4e7b880 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -18,31 +18,31 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class DAGStartedEvent implements HistoryEvent {
private TezDAGID dagID;
private long startTime;
+ private String user;
+ private String dagName;
public DAGStartedEvent() {
}
- public DAGStartedEvent(TezDAGID dagID, long startTime) {
+ public DAGStartedEvent(TezDAGID dagID, long startTime,
+ String user, String dagName) {
this.dagID = dagID;
this.startTime = startTime;
+ this.user = user;
+ this.dagName = dagName;
}
@Override
@@ -51,30 +51,6 @@ public class DAGStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- dagID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_DAG_ID.name());
-
- // Related Entities not needed as should have been done in
- // dag submission event
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.DAG_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -120,4 +96,13 @@ public class DAGStartedEvent implements HistoryEvent {
public TezDAGID getDagID() {
return dagID;
}
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getDagName() {
+ return dagName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 18f2205..f04afa4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -23,31 +23,32 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.records.DAGProtos;
+import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGSubmittedProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.utils.ProtoUtils;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
+
public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
+ private static final Log LOG = LogFactory.getLog(DAGSubmittedEvent.class);
+
private TezDAGID dagID;
private long submitTime;
private DAGProtos.DAGPlan dagPlan;
private ApplicationAttemptId applicationAttemptId;
+ private String user;
private Map<String, LocalResource> cumulativeAdditionalLocalResources;
public DAGSubmittedEvent() {
@@ -55,12 +56,14 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
public DAGSubmittedEvent(TezDAGID dagID, long submitTime,
DAGProtos.DAGPlan dagPlan, ApplicationAttemptId applicationAttemptId,
- Map<String, LocalResource> cumulativeAdditionalLocalResources) {
+ Map<String, LocalResource> cumulativeAdditionalLocalResources,
+ String user) {
this.dagID = dagID;
this.submitTime = submitTime;
this.dagPlan = dagPlan;
this.applicationAttemptId = applicationAttemptId;
this.cumulativeAdditionalLocalResources = cumulativeAdditionalLocalResources;
+ this.user = user;
}
@Override
@@ -69,62 +72,6 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY,
- dagID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_DAG_ID.name());
-
- // Related Entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject tezAppEntity = new JSONObject();
- tezAppEntity.put(ATSConstants.ENTITY,
- "tez_" + applicationAttemptId.toString());
- tezAppEntity.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_APPLICATION_ATTEMPT.name());
- JSONObject appEntity = new JSONObject();
- appEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.getApplicationId().toString());
- appEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ID);
- JSONObject appAttemptEntity = new JSONObject();
- appAttemptEntity.put(ATSConstants.ENTITY,
- applicationAttemptId.toString());
- appAttemptEntity.put(ATSConstants.ENTITY_TYPE,
- ATSConstants.APPLICATION_ATTEMPT_ID);
-
- relatedEntities.put(tezAppEntity);
- relatedEntities.put(appEntity);
- relatedEntities.put(appAttemptEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // filters
- JSONObject primaryFilters = new JSONObject();
- primaryFilters.put(ATSConstants.DAG_NAME,
- dagPlan.getName());
- jsonObject.put(ATSConstants.PRIMARY_FILTERS, primaryFilters);
-
- // TODO decide whether this goes into different events,
- // event info or other info.
- JSONArray events = new JSONArray();
- JSONObject submitEvent = new JSONObject();
- submitEvent.put(ATSConstants.TIMESTAMP, submitTime);
- submitEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.DAG_SUBMITTED.name());
- events.put(submitEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info such as dag plan
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.DAG_PLAN,
- DAGUtils.generateSimpleJSONPlan(dagPlan));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -220,4 +167,12 @@ public class DAGSubmittedEvent implements HistoryEvent, SummaryEvent {
return submitTime;
}
+ public DAGPlan getDagPlan() {
+ return dagPlan;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index ecb6818..aa85efb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -18,26 +18,24 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskAttemptFinishedEvent implements HistoryEvent {
+ private static final Log LOG = LogFactory.getLog(TaskAttemptFinishedEvent.class);
+
private TezTaskAttemptID taskAttemptId;
private String vertexName;
private long startTime;
@@ -71,35 +69,6 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_ATTEMPT_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -183,4 +152,8 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
return state;
}
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index ba91db8..76109ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -18,22 +18,17 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskAttemptStartedEvent implements HistoryEvent {
@@ -67,50 +62,6 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskAttemptId.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE,
- EntityTypes.TEZ_TASK_ATTEMPT_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject nodeEntity = new JSONObject();
- nodeEntity.put(ATSConstants.ENTITY, nodeId.toString());
- nodeEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.NODE_ID);
-
- JSONObject containerEntity = new JSONObject();
- containerEntity.put(ATSConstants.ENTITY, containerId.toString());
- containerEntity.put(ATSConstants.ENTITY_TYPE, ATSConstants.CONTAINER_ID);
-
- JSONObject taskEntity = new JSONObject();
- taskEntity.put(ATSConstants.ENTITY, taskAttemptId.getTaskID().toString());
- taskEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
- relatedEntities.put(nodeEntity);
- relatedEntities.put(containerEntity);
- relatedEntities.put(taskEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_ATTEMPT_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.IN_PROGRESS_LOGS_URL, inProgressLogsUrl);
- otherInfo.put(ATSConstants.COMPLETED_LOGS_URL, completedLogsUrl);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -173,4 +124,13 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
public NodeId getNodeId() {
return nodeId;
}
+
+ public String getInProgressLogsUrl() {
+ return inProgressLogsUrl;
+ }
+
+ public String getCompletedLogsUrl() {
+ return completedLogsUrl;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index 713ecd8..0940987 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,27 +18,25 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskFinishedEvent implements HistoryEvent {
+ private static final Log LOG = LogFactory.getLog(TaskFinishedEvent.class);
+
private TezTaskID taskID;
private String vertexName;
private long startTime;
@@ -68,34 +66,6 @@ public class TaskFinishedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
-
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -178,4 +148,9 @@ public class TaskFinishedEvent implements HistoryEvent {
public TezTaskAttemptID getSuccessfulAttemptID() {
return successfulAttemptID;
}
+
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index c2a380b..e93f3b3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -18,19 +18,14 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskStartedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
public class TaskStartedEvent implements HistoryEvent {
@@ -56,40 +51,6 @@ public class TaskStartedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, taskID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_TASK_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject vertexEntity = new JSONObject();
- vertexEntity.put(ATSConstants.ENTITY, taskID.getVertexID().toString());
- vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
- relatedEntities.put(vertexEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject startEvent = new JSONObject();
- startEvent.put(ATSConstants.TIMESTAMP, startTime);
- startEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.TASK_STARTED.name());
- events.put(startEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- // TODO fix schedule/launch time to be events
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.START_TIME, startTime);
- otherInfo.put(ATSConstants.SCHEDULED_TIME, scheduledTime);
-
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
index 387bff1..a7e74e6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
import com.google.protobuf.ByteString;
@@ -53,12 +51,6 @@ public class VertexCommitStartedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 035c9ca..03a3cdf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -18,7 +18,12 @@
package org.apache.tez.dag.history.events;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.EnumSet;
+import java.util.List;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.ProtoConverters;
@@ -36,14 +41,8 @@ import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.EnumSet;
-import java.util.List;
+import com.google.common.collect.Lists;
public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
@@ -79,11 +78,6 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 8321a38..6c18d9f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.app.dag.VertexState;
@@ -29,20 +31,16 @@ import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
-import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishStateProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexFinishedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
+ private static final Log LOG = LogFactory.getLog(VertexFinishedEvent.class);
+
private TezVertexID vertexID;
private String vertexName;
private long initRequestedTime;
@@ -83,33 +81,6 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject finishEvent = new JSONObject();
- finishEvent.put(ATSConstants.TIMESTAMP, finishTime);
- finishEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.VERTEX_FINISHED.name());
- events.put(finishEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.FINISH_TIME, finishTime);
- otherInfo.put(ATSConstants.TIME_TAKEN, (finishTime - startTime));
- otherInfo.put(ATSConstants.STATUS, state.name());
- otherInfo.put(ATSConstants.DIAGNOSTICS, diagnostics);
- otherInfo.put(ATSConstants.COUNTERS,
- DAGUtils.convertCountersToJSON(this.tezCounters));
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -198,6 +169,18 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
return tezCounters;
}
+ public VertexStats getVertexStats() {
+ return vertexStats;
+ }
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
@Override
public void toSummaryProtoStream(OutputStream outputStream) throws IOException {
VertexFinishStateProto finishStateProto =
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
index 99a5288..4c042e1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitFinishedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitFinishedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEvent {
@@ -54,12 +52,6 @@ public class VertexGroupCommitFinishedEvent implements HistoryEvent, SummaryEven
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
index 04d6276..d7df13a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexGroupCommitStartedEvent.java
@@ -29,8 +29,6 @@ import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexGroupCommitStartedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent {
@@ -54,12 +52,6 @@ public class VertexGroupCommitStartedEvent implements HistoryEvent, SummaryEvent
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- // TODO
- return null;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index e9e4a8c..2ae1b3a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -18,6 +18,12 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.dag.api.DagTypeConverters;
@@ -26,20 +32,9 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.ats.EntityTypes;
-import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
-import org.codehaus.jettison.json.JSONArray;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.LinkedHashMap;
-import java.util.Map;
public class VertexInitializedEvent implements HistoryEvent {
@@ -75,42 +70,6 @@ public class VertexInitializedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(ATSConstants.ENTITY, vertexID.toString());
- jsonObject.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_VERTEX_ID.name());
-
- // Related entities
- JSONArray relatedEntities = new JSONArray();
- JSONObject vertexEntity = new JSONObject();
- vertexEntity.put(ATSConstants.ENTITY, vertexID.getDAGId().toString());
- vertexEntity.put(ATSConstants.ENTITY_TYPE, EntityTypes.TEZ_DAG_ID.name());
- relatedEntities.put(vertexEntity);
- jsonObject.put(ATSConstants.RELATED_ENTITIES, relatedEntities);
-
- // Events
- JSONArray events = new JSONArray();
- JSONObject initEvent = new JSONObject();
- initEvent.put(ATSConstants.TIMESTAMP, initedTime);
- initEvent.put(ATSConstants.EVENT_TYPE,
- HistoryEventType.VERTEX_INITIALIZED.name());
- events.put(initEvent);
- jsonObject.put(ATSConstants.EVENTS, events);
-
- // Other info
- // TODO fix requested times to be events
- JSONObject otherInfo = new JSONObject();
- otherInfo.put(ATSConstants.VERTEX_NAME, vertexName);
- otherInfo.put(ATSConstants.INIT_REQUESTED_TIME, initRequestedTime);
- otherInfo.put(ATSConstants.INIT_TIME, initedTime);
- otherInfo.put(ATSConstants.NUM_TASKS, numTasks);
- otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, processorName);
- jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
-
- return jsonObject;
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}
@@ -214,4 +173,9 @@ public class VertexInitializedEvent implements HistoryEvent {
public String getProcessorName() {
return processorName;
}
+
+ public String getVertexName() {
+ return vertexName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/bc657961/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
index 43cc787..39748ec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -18,6 +18,13 @@
package org.apache.tez.dag.history.events;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.VertexLocationHint;
@@ -26,15 +33,6 @@ import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
public class VertexParallelismUpdatedEvent implements HistoryEvent {
@@ -61,12 +59,6 @@ public class VertexParallelismUpdatedEvent implements HistoryEvent {
}
@Override
- public JSONObject convertToATSJSON() throws JSONException {
- throw new UnsupportedOperationException("VertexParallelismUpdatedEvent"
- + " not a History event");
- }
-
- @Override
public boolean isRecoveryEvent() {
return true;
}