You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2014/03/06 00:35:17 UTC
[2/4] TEZ-847. Support basic AM recovery. (hitesh)
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
new file mode 100644
index 0000000..6d5a769
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.DAGCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class DAGCommitStartedEvent implements HistoryEvent {
+
+ private TezDAGID dagID;
+
+ public DAGCommitStartedEvent() {
+ }
+
+ public DAGCommitStartedEvent(TezDAGID dagID) {
+ this.dagID = dagID;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.DAG_COMMIT_STARTED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public DAGCommitStartedProto toProto() {
+ return DAGCommitStartedProto.newBuilder()
+ .setDagId(dagID.toString())
+ .build();
+ }
+
+ public void fromProto(DAGCommitStartedProto proto) {
+ this.dagID = TezDAGID.fromString(proto.getDagId());
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ DAGCommitStartedProto proto = DAGCommitStartedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "dagID=" + dagID;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
index cc9c3ad..38e0702 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
@@ -43,7 +43,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
private TezDAGID dagID;
private long startTime;
private long finishTime;
- private DAGStatus.State state;
+ private DAGState state;
private String diagnostics;
private TezCounters tezCounters;
@@ -51,7 +51,7 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
}
public DAGFinishedEvent(TezDAGID dagId, long startTime,
- long finishTime, DAGStatus.State state,
+ long finishTime, DAGState state,
String diagnostics, TezCounters counters) {
this.dagID = dagId;
this.startTime = startTime;
@@ -111,22 +111,33 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
}
public DAGFinishedProto toProto() {
- return DAGFinishedProto.newBuilder()
- .setDagId(dagID.toString())
+ DAGFinishedProto.Builder builder = DAGFinishedProto.newBuilder();
+
+ builder.setDagId(dagID.toString())
.setState(state.ordinal())
- .setDiagnostics(diagnostics)
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+
+ return builder.build();
}
public void fromProto(DAGFinishedProto proto) {
this.dagID = TezDAGID.fromString(proto.getDagId());
this.finishTime = proto.getFinishTime();
- this.state = DAGStatus.State.values()[proto.getState()];
- this.diagnostics = proto.getDiagnostics();
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
+ this.state = DAGState.values()[proto.getState()];
+ if (proto.hasDiagnostics()) {
+ this.diagnostics = proto.getDiagnostics();
+ }
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ proto.getCounters());
+ }
}
@Override
@@ -148,9 +159,9 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", counters=" + ((tezCounters == null) ? "null" :
+ (tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " ")));
}
@Override
@@ -159,4 +170,28 @@ public class DAGFinishedEvent implements HistoryEvent, SummaryEvent {
HistoryEventType.DAG_FINISHED).writeDelimitedTo(outputStream);
}
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public DAGState getState() {
+ return state;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public TezCounters getTezCounters() {
+ return tezCounters;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
index 20479e6..9b001b6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGInitializedEvent.java
@@ -94,4 +94,11 @@ public class DAGInitializedEvent implements HistoryEvent {
fromProto(proto);
}
+ public long getInitTime() {
+ return this.initTime;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
index 4574753..a1bcdf2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGStartedEvent.java
@@ -113,4 +113,11 @@ public class DAGStartedEvent implements HistoryEvent {
+ ", startTime=" + startTime;
}
+ public long getStartTime() {
+ return this.startTime;
+ }
+
+ public TezDAGID getDagID() {
+ return dagID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
index 3d24105..853bea7 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGSubmittedEvent.java
@@ -179,4 +179,15 @@ import java.io.OutputStream;
return this.dagPlan;
}
+ public TezDAGID getDagID() {
+ return dagID;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public long getSubmitTime() {
+ return submitTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 97f3be3..ecb6818 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -110,22 +110,31 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
}
public TaskAttemptFinishedProto toProto() {
- return TaskAttemptFinishedProto.newBuilder()
- .setTaskAttemptId(taskAttemptId.toString())
+ TaskAttemptFinishedProto.Builder builder =
+ TaskAttemptFinishedProto.newBuilder();
+ builder.setTaskAttemptId(taskAttemptId.toString())
.setState(state.ordinal())
- .setDiagnostics(diagnostics)
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+ return builder.build();
}
public void fromProto(TaskAttemptFinishedProto proto) {
this.taskAttemptId = TezTaskAttemptID.fromString(proto.getTaskAttemptId());
this.finishTime = proto.getFinishTime();
this.state = TaskAttemptState.values()[proto.getState()];
- this.diagnostics = proto.getDiagnostics();
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ if (proto.hasDiagnostics()) {
+ this.diagnostics = proto.getDiagnostics();
+ }
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
+ }
}
@Override
@@ -149,9 +158,29 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", counters=" + (tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
+ }
+
+ public TezTaskAttemptID getTaskAttemptID() {
+ return taskAttemptId;
+ }
+
+ public TezCounters getCounters() {
+ return tezCounters;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public TaskAttemptState getState() {
+ return state;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
index 11a1c62..ba91db8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptStartedEvent.java
@@ -112,7 +112,7 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return false;
+ return true;
}
@Override
@@ -158,4 +158,19 @@ public class TaskAttemptStartedEvent implements HistoryEvent {
+ ", completedLogs=" + completedLogsUrl;
}
+ public TezTaskAttemptID getTaskAttemptID() {
+ return this.taskAttemptId;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public ContainerId getContainerId() {
+ return containerId;
+ }
+
+ public NodeId getNodeId() {
+ return nodeId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
index dac2f9a..74c804c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskFinishedEvent.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.history.events;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskState;
@@ -26,6 +27,7 @@ import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskFinishedProto;
import org.codehaus.jettison.json.JSONArray;
@@ -44,9 +46,11 @@ public class TaskFinishedEvent implements HistoryEvent {
private long finishTime;
private TaskState state;
private TezCounters tezCounters;
+ private TezTaskAttemptID successfulAttemptID;
public TaskFinishedEvent(TezTaskID taskID,
String vertexName, long startTime, long finishTime,
+ TezTaskAttemptID successfulAttemptID,
TaskState state, TezCounters counters) {
this.vertexName = vertexName;
this.taskID = taskID;
@@ -103,20 +107,31 @@ public class TaskFinishedEvent implements HistoryEvent {
}
public TaskFinishedProto toProto() {
- return TaskFinishedProto.newBuilder()
- .setTaskId(taskID.toString())
+ TaskFinishedProto.Builder builder = TaskFinishedProto.newBuilder();
+ builder.setTaskId(taskID.toString())
.setState(state.ordinal())
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+ if (successfulAttemptID != null) {
+ builder.setSuccessfulTaskAttemptId(successfulAttemptID.toString());
+ }
+ return builder.build();
}
public void fromProto(TaskFinishedProto proto) {
this.taskID = TezTaskID.fromString(proto.getTaskId());
this.finishTime = proto.getFinishTime();
this.state = TaskState.values()[proto.getState()];
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ proto.getCounters());
+ }
+ if (proto.hasSuccessfulTaskAttemptId()) {
+ this.successfulAttemptID =
+ TezTaskAttemptID.fromString(proto.getSuccessfulTaskAttemptId());
+ }
}
@Override
@@ -138,9 +153,30 @@ public class TaskFinishedEvent implements HistoryEvent {
+ ", finishTime=" + finishTime
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", successfulAttemptID=" + (successfulAttemptID == null ? "null" :
+ successfulAttemptID.toString())
+ + ", counters=" + ( tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
}
+ public TezTaskID getTaskID() {
+ return taskID;
+ }
+
+ public TaskState getState() {
+ return state;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public TezCounters getTezCounters() {
+ return tezCounters;
+ }
+
+ public TezTaskAttemptID getSuccessfulAttemptID() {
+ return successfulAttemptID;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
index 9efa357..c2a380b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskStartedEvent.java
@@ -91,7 +91,7 @@ public class TaskStartedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return false;
+ return true;
}
@Override
@@ -132,4 +132,16 @@ public class TaskStartedEvent implements HistoryEvent {
+ ", launchTime=" + startTime;
}
+ public TezTaskID getTaskID() {
+ return taskID;
+ }
+
+ public long getScheduledTime() {
+ return scheduledTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
new file mode 100644
index 0000000..564f9ed
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexCommitStartedEvent.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexCommitStartedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class VertexCommitStartedEvent implements HistoryEvent {
+
+ private TezVertexID vertexID;
+
+ public VertexCommitStartedEvent() {
+ }
+
+ public VertexCommitStartedEvent(TezVertexID vertexId) {
+ this.vertexID = vertexId;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_COMMIT_STARTED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexCommitStartedProto toProto() {
+ return VertexCommitStartedProto.newBuilder()
+ .setVertexId(vertexID.toString())
+ .build();
+ }
+
+ public void fromProto(VertexCommitStartedProto proto) {
+ this.vertexID = TezVertexID.fromString(proto.getVertexId());
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexCommitStartedProto proto = VertexCommitStartedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "vertexId=" + vertexID;
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
index 7c2a16f..035c9ca 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexDataMovementEventsGeneratedEvent.java
@@ -206,4 +206,12 @@ public class VertexDataMovementEventsGeneratedEvent implements HistoryEvent {
}
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public List<TezEvent> getTezEvents() {
+ return this.events;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index 0f2b8a1..2366eb1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -20,7 +20,7 @@ package org.apache.tez.dag.history.events;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
-import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.ats.EntityTypes;
@@ -45,13 +45,13 @@ public class VertexFinishedEvent implements HistoryEvent {
private long startRequestedTime;
private long startTime;
private long finishTime;
- private VertexStatus.State state;
+ private VertexState state;
private String diagnostics;
private TezCounters tezCounters;
public VertexFinishedEvent(TezVertexID vertexId,
String vertexName, long initRequestedTime, long initedTime, long startRequestedTime, long startedTime, long finishTime,
- VertexStatus.State state, String diagnostics,
+ VertexState state, String diagnostics,
TezCounters counters) {
this.vertexName = vertexName;
this.vertexID = vertexId;
@@ -111,24 +111,32 @@ public class VertexFinishedEvent implements HistoryEvent {
}
public VertexFinishedProto toProto() {
- return VertexFinishedProto.newBuilder()
- .setVertexName(vertexName)
+ VertexFinishedProto.Builder builder = VertexFinishedProto.newBuilder();
+ builder.setVertexName(vertexName)
.setVertexId(vertexID.toString())
.setState(state.ordinal())
- .setDiagnostics(diagnostics)
- .setFinishTime(finishTime)
- .setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters))
- .build();
+ .setFinishTime(finishTime);
+ if (diagnostics != null) {
+ builder.setDiagnostics(diagnostics);
+ }
+ if (tezCounters != null) {
+ builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
+ }
+ return builder.build();
}
public void fromProto(VertexFinishedProto proto) {
this.vertexName = proto.getVertexName();
this.vertexID = TezVertexID.fromString(proto.getVertexId());
this.finishTime = proto.getFinishTime();
- this.state = VertexStatus.State.values()[proto.getState()];
- this.diagnostics = proto.getDiagnostics();
- this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
- proto.getCounters());
+ this.state = VertexState.values()[proto.getState()];
+ if (proto.hasDiagnostics()) {
+ this.diagnostics = proto.getDiagnostics();
+ }
+ if (proto.hasCounters()) {
+ this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
+ proto.getCounters());
+ }
}
@Override
@@ -154,9 +162,28 @@ public class VertexFinishedEvent implements HistoryEvent {
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ ", diagnostics=" + diagnostics
- + ", counters="
- + tezCounters.toString()
- .replaceAll("\\n", ", ").replaceAll("\\s+", " ");
+ + ", counters=" + ( tezCounters == null ? "null" :
+ tezCounters.toString()
+ .replaceAll("\\n", ", ").replaceAll("\\s+", " "));
}
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public VertexState getState() {
+ return this.state;
+ }
+
+ public long getFinishTime() {
+ return this.finishTime;
+ }
+
+ public String getDiagnostics() {
+ return diagnostics;
+ }
+
+ public TezCounters getTezCounters() {
+ return tezCounters;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 21c7587..e9e4a8c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -18,12 +18,19 @@
package org.apache.tez.dag.history.events;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.app.dag.impl.RootInputLeafOutputDescriptor;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.ats.EntityTypes;
import org.apache.tez.dag.history.utils.ATSConstants;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexInitializedProto;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
@@ -31,28 +38,35 @@ import org.codehaus.jettison.json.JSONObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.LinkedHashMap;
+import java.util.Map;
public class VertexInitializedEvent implements HistoryEvent {
+ private static final Log LOG = LogFactory.getLog(VertexInitializedEvent.class);
+
private TezVertexID vertexID;
private String vertexName;
private long initRequestedTime;
private long initedTime;
- private long numTasks;
+ private int numTasks;
private String processorName;
+ private Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs;
public VertexInitializedEvent() {
}
public VertexInitializedEvent(TezVertexID vertexId,
String vertexName, long initRequestedTime, long initedTime,
- long numTasks, String processorName) {
+ int numTasks, String processorName,
+ Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> additionalInputs) {
this.vertexName = vertexName;
this.vertexID = vertexId;
this.initRequestedTime = initRequestedTime;
this.initedTime = initedTime;
this.numTasks = numTasks;
this.processorName = processorName;
+ this.additionalInputs = additionalInputs;
}
@Override
@@ -107,8 +121,23 @@ public class VertexInitializedEvent implements HistoryEvent {
}
public RecoveryProtos.VertexInitializedProto toProto() {
- return RecoveryProtos.VertexInitializedProto.newBuilder()
- .setVertexId(vertexID.toString())
+ VertexInitializedProto.Builder builder = VertexInitializedProto.newBuilder();
+ if (additionalInputs != null
+ && !additionalInputs.isEmpty()) {
+ for (RootInputLeafOutputDescriptor<InputDescriptor> input :
+ additionalInputs.values()) {
+ RootInputLeafOutputProto.Builder inputBuilder
+ = RootInputLeafOutputProto.newBuilder();
+ inputBuilder.setName(input.getEntityName());
+ if (input.getInitializerClassName() != null) {
+ inputBuilder.setInitializerClassName(input.getInitializerClassName());
+ }
+ inputBuilder.setEntityDescriptor(
+ DagTypeConverters.convertToDAGPlan(input.getDescriptor()));
+ builder.addInputs(inputBuilder.build());
+ }
+ }
+ return builder.setVertexId(vertexID.toString())
.setVertexName(vertexName)
.setInitRequestedTime(initRequestedTime)
.setInitTime(initedTime)
@@ -122,6 +151,20 @@ public class VertexInitializedEvent implements HistoryEvent {
this.initRequestedTime = proto.getInitRequestedTime();
this.initedTime = proto.getInitTime();
this.numTasks = proto.getNumTasks();
+ if (proto.getInputsCount() > 0) {
+ this.additionalInputs =
+ new LinkedHashMap<String, RootInputLeafOutputDescriptor<InputDescriptor>>();
+ for (RootInputLeafOutputProto inputProto : proto.getInputsList()) {
+ RootInputLeafOutputDescriptor<InputDescriptor> input =
+ new RootInputLeafOutputDescriptor<InputDescriptor>(
+ inputProto.getName(),
+ DagTypeConverters.convertInputDescriptorFromDAGPlan(
+ inputProto.getEntityDescriptor()),
+ inputProto.hasInitializerClassName() ?
+ inputProto.getInitializerClassName() : null);
+ additionalInputs.put(input.getEntityName(), input);
+ }
+ }
}
@Override
@@ -143,7 +186,32 @@ public class VertexInitializedEvent implements HistoryEvent {
+ ", initRequestedTime=" + initRequestedTime
+ ", initedTime=" + initedTime
+ ", numTasks=" + numTasks
- + ", processorName=" + processorName;
+ + ", processorName=" + processorName
+ + ", additionalInputsCount="
+ + (additionalInputs != null ? additionalInputs.size() : 0);
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public long getInitRequestedTime() {
+ return initRequestedTime;
+ }
+
+ public long getInitedTime() {
+ return initedTime;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
}
+ public Map<String, RootInputLeafOutputDescriptor<InputDescriptor>> getAdditionalInputs() {
+ return additionalInputs;
+ }
+
+ public String getProcessorName() {
+ return processorName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
new file mode 100644
index 0000000..43cc787
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexParallelismUpdatedEvent.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.history.events;
+
+import org.apache.tez.dag.api.DagTypeConverters;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TezVertexID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.EdgeManagerDescriptorProto;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.VertexParallelismUpdatedProto;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class VertexParallelismUpdatedEvent implements HistoryEvent {
+
+ private TezVertexID vertexID;
+ private int numTasks;
+ private VertexLocationHint vertexLocationHint;
+ private Map<String, EdgeManagerDescriptor> sourceEdgeManagers;
+
+ public VertexParallelismUpdatedEvent() {
+ }
+
+ public VertexParallelismUpdatedEvent(TezVertexID vertexID,
+ int numTasks, VertexLocationHint vertexLocationHint,
+ Map<String, EdgeManagerDescriptor> sourceEdgeManagers) {
+ this.vertexID = vertexID;
+ this.numTasks = numTasks;
+ this.vertexLocationHint = vertexLocationHint;
+ this.sourceEdgeManagers = sourceEdgeManagers;
+ }
+
+ @Override
+ public HistoryEventType getEventType() {
+ return HistoryEventType.VERTEX_PARALLELISM_UPDATED;
+ }
+
+ @Override
+ public JSONObject convertToATSJSON() throws JSONException {
+ throw new UnsupportedOperationException("VertexParallelismUpdatedEvent"
+ + " not a History event");
+ }
+
+ @Override
+ public boolean isRecoveryEvent() {
+ return true;
+ }
+
+ @Override
+ public boolean isHistoryEvent() {
+ return false;
+ }
+
+ public VertexParallelismUpdatedProto toProto() {
+ VertexParallelismUpdatedProto.Builder builder =
+ VertexParallelismUpdatedProto.newBuilder();
+ builder.setVertexId(vertexID.toString())
+ .setNumTasks(numTasks);
+ if (vertexLocationHint != null) {
+ builder.setVertexLocationHint(DagTypeConverters.convertVertexLocationHintToProto(
+ this.vertexLocationHint));
+ }
+ if (sourceEdgeManagers != null) {
+ for (Entry<String, EdgeManagerDescriptor> entry :
+ sourceEdgeManagers.entrySet()) {
+ EdgeManagerDescriptorProto.Builder edgeMgrBuilder =
+ EdgeManagerDescriptorProto.newBuilder();
+ edgeMgrBuilder.setEdgeName(entry.getKey());
+ edgeMgrBuilder.setEntityDescriptor(
+ DagTypeConverters.convertToDAGPlan(entry.getValue()));
+ builder.addEdgeManagerDescriptors(edgeMgrBuilder.build());
+ }
+ }
+ return builder.build();
+ }
+
+ public void fromProto(VertexParallelismUpdatedProto proto) {
+ this.vertexID = TezVertexID.fromString(proto.getVertexId());
+ this.numTasks = proto.getNumTasks();
+ if (proto.hasVertexLocationHint()) {
+ this.vertexLocationHint = DagTypeConverters.convertVertexLocationHintFromProto(
+ proto.getVertexLocationHint());
+ }
+ if (proto.getEdgeManagerDescriptorsCount() > 0) {
+ this.sourceEdgeManagers = new HashMap<String, EdgeManagerDescriptor>(
+ proto.getEdgeManagerDescriptorsCount());
+ for (EdgeManagerDescriptorProto edgeManagerProto :
+ proto.getEdgeManagerDescriptorsList()) {
+ EdgeManagerDescriptor edgeManagerDescriptor =
+ DagTypeConverters.convertEdgeManagerDescriptorFromDAGPlan(
+ edgeManagerProto.getEntityDescriptor());
+ sourceEdgeManagers.put(edgeManagerProto.getEdgeName(),
+ edgeManagerDescriptor);
+ }
+ }
+ }
+
+ @Override
+ public void toProtoStream(OutputStream outputStream) throws IOException {
+ toProto().writeDelimitedTo(outputStream);
+ }
+
+ @Override
+ public void fromProtoStream(InputStream inputStream) throws IOException {
+ VertexParallelismUpdatedProto proto = VertexParallelismUpdatedProto.parseDelimitedFrom(inputStream);
+ fromProto(proto);
+ }
+
+ @Override
+ public String toString() {
+ return "vertexId=" + vertexID
+ + ", numTasks=" + numTasks
+ + ", vertexLocationHint=" +
+ (vertexLocationHint == null? "null" : vertexLocationHint)
+ + ", edgeManagersCount=" +
+ (sourceEdgeManagers == null? "null" : sourceEdgeManagers.size());
+ }
+
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public int getNumTasks() {
+ return numTasks;
+ }
+
+ public VertexLocationHint getVertexLocationHint() {
+ return vertexLocationHint;
+ }
+
+ public Map<String, EdgeManagerDescriptor> getSourceEdgeManagers() {
+ return sourceEdgeManagers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
index 6bb383d..e6023f1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexStartedEvent.java
@@ -88,7 +88,7 @@ public class VertexStartedEvent implements HistoryEvent {
@Override
public boolean isRecoveryEvent() {
- return false;
+ return true;
}
@Override
@@ -128,4 +128,16 @@ public class VertexStartedEvent implements HistoryEvent {
+ ", startedTime=" + startTime;
}
+ public TezVertexID getVertexID() {
+ return this.vertexID;
+ }
+
+ public long getStartRequestedTime() {
+ return startRequestedTime;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
index 807ad81..5986657 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/recovery/RecoveryService.java
@@ -34,9 +34,11 @@ import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -49,6 +51,7 @@ public class RecoveryService extends AbstractService {
private LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
new LinkedBlockingQueue<DAGHistoryEvent>();
private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
+ private Set<TezDAGID> skippedDAGs = new HashSet<TezDAGID>();
private Thread eventHandlingThread;
private AtomicBoolean stopped = new AtomicBoolean(false);
@@ -60,9 +63,12 @@ public class RecoveryService extends AbstractService {
Path recoveryPath;
Map<TezDAGID, FSDataOutputStream> outputStreamMap = new
HashMap<TezDAGID, FSDataOutputStream>();
- // FSDataOutputStream metaInfoStream;
private int bufferSize;
private FSDataOutputStream summaryStream;
+ private int unflushedEventsCount = 0;
+ private long lastFlushTime = -1;
+ private int maxUnflushedEvents;
+ private int flushInterval;
public RecoveryService(AppContext appContext) {
super(RecoveryService.class.getName());
@@ -76,11 +82,17 @@ public class RecoveryService extends AbstractService {
recoveryDirFS = FileSystem.get(recoveryPath.toUri(), conf);
bufferSize = conf.getInt(TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE,
TezConfiguration.DAG_RECOVERY_FILE_IO_BUFFER_SIZE_DEFAULT);
+
+ flushInterval = conf.getInt(TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS,
+ TezConfiguration.DAG_RECOVERY_FLUSH_INTERVAL_SECS_DEFAULT);
+ maxUnflushedEvents = conf.getInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS,
+ TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS_DEFAULT);
}
@Override
public void serviceStart() {
LOG.info("Starting RecoveryService");
+ lastFlushTime = appContext.getClock().getTime();
eventHandlingThread = new Thread(new Runnable() {
@Override
public void run() {
@@ -128,18 +140,21 @@ public class RecoveryService extends AbstractService {
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
}
+
if (summaryStream != null) {
try {
- summaryStream.flush();
+ LOG.info("Closing Summary Stream");
+ summaryStream.hsync();
summaryStream.close();
} catch (IOException ioe) {
LOG.warn("Error when closing summary stream", ioe);
}
}
- for (FSDataOutputStream outputStream : outputStreamMap.values()) {
+ for (Entry<TezDAGID, FSDataOutputStream> entry : outputStreamMap.entrySet()) {
try {
- outputStream.flush();
- outputStream.close();
+ LOG.info("Closing Output Stream for DAG " + entry.getKey());
+ entry.getValue().hsync();
+ entry.getValue().close();
} catch (IOException ioe) {
LOG.warn("Error when closing output stream", ioe);
}
@@ -152,38 +167,44 @@ public class RecoveryService extends AbstractService {
+ event.getHistoryEvent().getEventType());
return;
}
+ HistoryEventType eventType = event.getHistoryEvent().getEventType();
if (!started.get()) {
+ LOG.warn("Adding event of type " + eventType
+ + " to queue as service not started");
eventQueue.add(event);
return;
}
- HistoryEventType eventType = event.getHistoryEvent().getEventType();
+
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
|| eventType.equals(HistoryEventType.DAG_FINISHED)) {
// handle submissions and completion immediately
synchronized (lock) {
try {
handleEvent(event);
- summaryStream.flush();
+ summaryStream.hsync();
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
- outputStreamMap.get(event.getDagID()).flush();
+ if (outputStreamMap.containsKey(event.getDagID())) {
+ doFlush(outputStreamMap.get(event.getDagID()),
+ appContext.getClock().getTime(), true);
+ }
} else if (eventType.equals(HistoryEventType.DAG_FINISHED)) {
completedDAGs.add(event.getDagID());
if (outputStreamMap.containsKey(event.getDagID())) {
try {
- outputStreamMap.get(event.getDagID()).flush();
+ doFlush(outputStreamMap.get(event.getDagID()),
+ appContext.getClock().getTime(), true);
outputStreamMap.get(event.getDagID()).close();
outputStreamMap.remove(event.getDagID());
} catch (IOException ioe) {
LOG.warn("Error when trying to flush/close recovery file for"
+ " dag, dagId=" + event.getDagID());
+ // FIXME handle error ?
}
- } else {
- // TODO this is an error
}
}
} catch (Exception e) {
- // TODO handle failures - treat as fatal or ignore?
- LOG.warn("Error handling recovery event", e);
+ // FIXME handle failures
+ LOG.warn("Error handling recovery event", e);
}
}
LOG.info("DAG completed"
@@ -191,6 +212,9 @@ public class RecoveryService extends AbstractService {
+ ", queueSize=" + eventQueue.size());
} else {
// All other events just get queued
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queueing Recovery event of type " + eventType.name());
+ }
eventQueue.add(event);
}
}
@@ -206,27 +230,42 @@ public class RecoveryService extends AbstractService {
// AM event
// anything to be done?
// TODO
+ LOG.info("Skipping Recovery Event as DAG is null"
+ + ", eventType=" + event.getHistoryEvent().getEventType());
return;
}
TezDAGID dagID = event.getDagID();
- if (completedDAGs.contains(dagID)) {
- // Skip events for completed DAGs
+ if (completedDAGs.contains(dagID)
+ || skippedDAGs.contains(dagID)) {
+ // Skip events for completed and skipped DAGs
// no need to recover completed DAGs
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping Recovery Event as either completed or skipped"
+ + ", dagId=" + dagID
+ + ", completed=" + completedDAGs.contains(dagID)
+ + ", skipped=" + skippedDAGs.contains(dagID)
+ + ", eventType=" + event.getHistoryEvent().getEventType());
+ }
return;
}
try {
- if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
- || eventType.equals(HistoryEventType.DAG_FINISHED)) {
- if (summaryStream == null) {
- Path summaryPath = new Path(recoveryPath,
- appContext.getApplicationID()
- + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ if (summaryStream == null) {
+ Path summaryPath = new Path(recoveryPath,
+ appContext.getApplicationID()
+ + TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX);
+ if (!recoveryDirFS.exists(summaryPath)) {
summaryStream = recoveryDirFS.create(summaryPath, false,
bufferSize);
+ } else {
+ summaryStream = recoveryDirFS.append(summaryPath, bufferSize);
}
+ }
+
+ if (eventType.equals(HistoryEventType.DAG_SUBMITTED)
+ || eventType.equals(HistoryEventType.DAG_FINISHED)) {
if (eventType.equals(HistoryEventType.DAG_SUBMITTED)) {
DAGSubmittedEvent dagSubmittedEvent =
(DAGSubmittedEvent) event.getHistoryEvent();
@@ -235,33 +274,97 @@ public class RecoveryService extends AbstractService {
&& dagName.startsWith(
TezConfiguration.TEZ_PREWARM_DAG_NAME_PREFIX)) {
// Skip recording pre-warm DAG events
+ skippedDAGs.add(dagID);
return;
}
- Path dagFilePath = new Path(recoveryPath,
- dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
- FSDataOutputStream outputStream =
- recoveryDirFS.create(dagFilePath, false, bufferSize);
- outputStreamMap.put(dagID, outputStream);
}
+ SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing recovery event to summary stream"
+ + ", dagId=" + dagID
+ + ", type="
+ + event.getHistoryEvent().getEventType());
+ }
+ summaryEvent.toSummaryProtoStream(summaryStream);
+ }
- if (outputStreamMap.containsKey(dagID)) {
- SummaryEvent summaryEvent = (SummaryEvent) event.getHistoryEvent();
- summaryEvent.toSummaryProtoStream(summaryStream);
+ if (!outputStreamMap.containsKey(dagID)) {
+ Path dagFilePath = new Path(recoveryPath,
+ dagID.toString() + TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
+ FSDataOutputStream outputStream;
+ if (recoveryDirFS.exists(dagFilePath)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening DAG recovery file in append mode"
+ + ", filePath=" + dagFilePath);
+ }
+ outputStream = recoveryDirFS.append(dagFilePath, bufferSize);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening DAG recovery file in create mode"
+ + ", filePath=" + dagFilePath);
+ }
+ outputStream = recoveryDirFS.create(dagFilePath, false, bufferSize);
}
+ outputStreamMap.put(dagID, outputStream);
}
FSDataOutputStream outputStream = outputStreamMap.get(dagID);
- if (outputStream == null) {
- return;
- }
- outputStream.write(event.getHistoryEvent().getEventType().ordinal());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing recovery event to output stream"
+ + ", dagId=" + dagID
+ + ", type="
+ + event.getHistoryEvent().getEventType());
+ }
+ ++unflushedEventsCount;
+ outputStream.writeInt(event.getHistoryEvent().getEventType().ordinal());
event.getHistoryEvent().toProtoStream(outputStream);
+ if (!EnumSet.of(HistoryEventType.DAG_SUBMITTED,
+ HistoryEventType.DAG_FINISHED).contains(eventType)) {
+ maybeFlush(outputStream);
+ }
} catch (IOException ioe) {
- // TODO handle failures - treat as fatal or ignore?
+ // FIXME handle failures
LOG.warn("Failed to write to stream", ioe);
}
}
+ private void maybeFlush(FSDataOutputStream outputStream) throws IOException {
+ long currentTime = appContext.getClock().getTime();
+ boolean doFlush = false;
+ if (unflushedEventsCount >= maxUnflushedEvents) {
+ doFlush = true;
+ } else if (flushInterval >= 0
+ && ((currentTime - lastFlushTime) >= (flushInterval*1000))) {
+ doFlush = true;
+ }
+
+ if (!doFlush) {
+ return;
+ }
+
+ doFlush(outputStream, currentTime, false);
+ }
+
+ private void doFlush(FSDataOutputStream outputStream,
+ long currentTime, boolean sync) throws IOException {
+ if (sync) {
+ outputStream.hsync();
+ } else {
+ outputStream.hflush();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Flushing output stream"
+ + ", lastTimeSinceFLush=" + lastFlushTime
+ + ", unflushedEventsCount=" + unflushedEventsCount
+ + ", maxUnflushedEvents=" + maxUnflushedEvents
+ + ", currentTime=" + currentTime);
+ }
+
+ unflushedEventsCount = 0;
+ lastFlushTime = currentTime;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
deleted file mode 100644
index 5b87bc8..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/recovery/RecoveryParser.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.dag.recovery;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.history.HistoryEvent;
-import org.apache.tez.dag.history.HistoryEventType;
-import org.apache.tez.dag.history.events.AMLaunchedEvent;
-import org.apache.tez.dag.history.events.AMStartedEvent;
-import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
-import org.apache.tez.dag.history.events.DAGFinishedEvent;
-import org.apache.tez.dag.history.events.DAGInitializedEvent;
-import org.apache.tez.dag.history.events.DAGStartedEvent;
-import org.apache.tez.dag.history.events.DAGSubmittedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
-import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
-import org.apache.tez.dag.history.events.TaskFinishedEvent;
-import org.apache.tez.dag.history.events.TaskStartedEvent;
-import org.apache.tez.dag.history.events.VertexDataMovementEventsGeneratedEvent;
-import org.apache.tez.dag.history.events.VertexFinishedEvent;
-import org.apache.tez.dag.history.events.VertexInitializedEvent;
-import org.apache.tez.dag.history.events.VertexStartedEvent;
-import org.apache.tez.dag.recovery.records.RecoveryProtos;
-
-import java.io.IOException;
-
-public class RecoveryParser {
-
- private static final Log LOG = LogFactory.getLog(RecoveryParser.class);
-
- Path recoveryDirectory;
- FileSystem recoveryDirFS;
-
- public RecoveryParser(Path recoveryDirectory, Configuration conf)
- throws IOException {
- this.recoveryDirectory = recoveryDirectory;
- recoveryDirFS = FileSystem.get(recoveryDirectory.toUri(), conf);
-
- }
-
- public void parse() throws IOException {
- RemoteIterator<LocatedFileStatus> locatedFilesStatus =
- recoveryDirFS.listFiles(recoveryDirectory, false);
- while (locatedFilesStatus.hasNext()) {
- LocatedFileStatus fileStatus = locatedFilesStatus.next();
- String fileName = fileStatus.getPath().getName();
- if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_RECOVER_FILE_SUFFIX)) {
- FSDataInputStream inputStream =
- recoveryDirFS.open(fileStatus.getPath());
- LOG.info("Parsing DAG file " + fileName);
- parseDAGRecoveryFile(inputStream);
- } else if (fileName.endsWith(TezConfiguration.DAG_RECOVERY_SUMMARY_FILE_SUFFIX)) {
- FSDataInputStream inputStream =
- recoveryDirFS.open(fileStatus.getPath());
- LOG.info("Parsing Summary file " + fileName);
- parseSummaryFile(inputStream);
- } else {
- LOG.warn("Encountered unknown file in recovery dir, fileName="
- + fileName);
- continue;
- }
- }
- }
-
- private void parseSummaryFile(FSDataInputStream inputStream)
- throws IOException {
- int counter = 0;
- while (inputStream.available() > 0) {
- RecoveryProtos.SummaryEventProto proto =
- RecoveryProtos.SummaryEventProto.parseDelimitedFrom(inputStream);
- LOG.info("[SUMMARY]"
- + " dagId=" + proto.getDagId()
- + ", timestamp=" + proto.getTimestamp()
- + ", event=" + HistoryEventType.values()[proto.getEventType()]);
- }
- }
-
- private void parseDAGRecoveryFile(FSDataInputStream inputStream)
- throws IOException {
- int counter = 0;
- while (inputStream.available() > 0) {
- int eventTypeOrdinal = inputStream.read();
- if (eventTypeOrdinal < 0 || eventTypeOrdinal >=
- HistoryEventType.values().length) {
- // Corrupt data
- // reached end
- LOG.warn("Corrupt data found when trying to read next event type");
- break;
- }
- HistoryEventType eventType = HistoryEventType.values()[eventTypeOrdinal];
- HistoryEvent event = null;
- switch (eventType) {
- case AM_LAUNCHED:
- event = new AMLaunchedEvent();
- break;
- case AM_STARTED:
- event = new AMStartedEvent();
- break;
- case DAG_SUBMITTED:
- event = new DAGSubmittedEvent();
- break;
- case DAG_INITIALIZED:
- event = new DAGInitializedEvent();
- break;
- case DAG_STARTED:
- event = new DAGStartedEvent();
- break;
- case DAG_FINISHED:
- event = new DAGFinishedEvent();
- break;
- case CONTAINER_LAUNCHED:
- event = new ContainerLaunchedEvent();
- break;
- case VERTEX_INITIALIZED:
- event = new VertexInitializedEvent();
- break;
- case VERTEX_STARTED:
- event = new VertexStartedEvent();
- break;
- case VERTEX_FINISHED:
- event = new VertexFinishedEvent();
- break;
- case TASK_STARTED:
- event = new TaskStartedEvent();
- break;
- case TASK_FINISHED:
- event = new TaskFinishedEvent();
- break;
- case TASK_ATTEMPT_STARTED:
- event = new TaskAttemptStartedEvent();
- break;
- case TASK_ATTEMPT_FINISHED:
- event = new TaskAttemptFinishedEvent();
- break;
- case VERTEX_DATA_MOVEMENT_EVENTS_GENERATED:
- event = new VertexDataMovementEventsGeneratedEvent();
- break;
- default:
- throw new IOException("Invalid data found, unknown event type "
- + eventType);
-
- }
- ++counter;
- LOG.info("Parsing event from input stream"
- + ", eventType=" + eventType
- + ", eventIndex=" + counter);
- event.fromProtoStream(inputStream);
- LOG.info("Parsed event from input stream"
- + ", eventType=" + eventType
- + ", eventIndex=" + counter
- + ", event=" + event.toString());
- }
- }
-
- public static void main(String argv[]) throws IOException {
- // TODO clean up with better usage and error handling
- Configuration conf = new Configuration();
- String dir = argv[0];
- RecoveryParser parser = new RecoveryParser(new Path(dir), conf);
- parser.parse();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e21f5df..65f3aaf 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -56,6 +56,9 @@ message DAGStartedProto {
optional int64 start_time = 3;
}
+message DAGCommitStartedProto {
+ optional string dag_id = 1;
+}
message DAGFinishedProto {
optional string dag_id = 1;
optional int64 finish_time = 2;
@@ -69,7 +72,8 @@ message VertexInitializedProto {
optional string vertex_id = 2;
optional int64 init_requested_time = 3;
optional int64 init_time = 4;
- optional int64 num_tasks = 5;
+ optional int32 num_tasks = 5;
+ repeated RootInputLeafOutputProto inputs = 6;
}
message VertexStartedProto {
@@ -79,6 +83,22 @@ message VertexStartedProto {
optional int64 start_time = 4;
}
+message EdgeManagerDescriptorProto {
+ optional string edge_name = 1;
+ optional TezEntityDescriptorProto entity_descriptor = 2;
+}
+
+message VertexParallelismUpdatedProto {
+ optional string vertex_id = 1;
+ optional int32 num_tasks = 2;
+ optional VertexLocationHintProto vertex_location_hint = 3;
+ repeated EdgeManagerDescriptorProto edge_manager_descriptors = 4;
+}
+
+message VertexCommitStartedProto {
+ optional string vertex_id = 1;
+}
+
message VertexFinishedProto {
optional string vertex_name = 1;
optional string vertex_id = 2;
@@ -100,6 +120,7 @@ message TaskFinishedProto {
optional int32 state = 3;
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
+ optional string successful_task_attempt_id = 6;
}
message TaskAttemptStartedProto {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/5b464f27/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
index e965bca..0a3b9e0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/api/client/TestVertexStatusBuilder.java
@@ -18,6 +18,7 @@
package org.apache.tez.dag.api.client;
+import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.dag.VertexState;
import org.junit.Assert;
@@ -32,7 +33,12 @@ public class TestVertexStatusBuilder {
VertexStatusBuilder.getProtoState(state);
VertexStatus.State clientState =
VertexStatus.getState(stateProto);
- Assert.assertEquals(state.name(), clientState.name());
+ if (state.equals(VertexState.RECOVERING)) {
+ Assert.assertEquals(clientState.name(),
+ State.NEW.name());
+ } else {
+ Assert.assertEquals(state.name(), clientState.name());
+ }
}
}